Newer
Older
/* Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */

Pierre-Alain Loizeau
committed
/**
* CbmDeviceMcbmEventBuilderWin.cxx
*
* @since 2020-05-24
* @author P.-A. Loizeau
*/
#include "CbmDeviceMcbmEventBuilderWin.h"
/// CBM headers
#include "CbmEvent.h"
#include "CbmFlesCanvasTools.h"

Pierre-Alain Loizeau
committed
#include "CbmMatch.h"

Pierre-Alain Loizeau
committed
/// FAIRROOT headers
#include "FairMQLogger.h"

Pierre-Alain Loizeau
committed
#include "FairParGenericSet.h"
#include "BoostSerializer.h"

Pierre-Alain Loizeau
committed
#include "RootSerializer.h"
/// FAIRSOFT headers (geant, boost, ...)
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"

Pierre-Alain Loizeau
committed
#include <boost/archive/binary_iarchive.hpp>

Pierre-Alain Loizeau
committed
/// C/C++ headers
#include <array>

Pierre-Alain Loizeau
committed
#include <stdexcept>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};

Pierre-Alain Loizeau
committed
using namespace std;
//Bool_t bMcbm2018MonitorTaskBmonResetHistos = kFALSE;

Pierre-Alain Loizeau
committed
CbmDeviceMcbmEventBuilderWin::CbmDeviceMcbmEventBuilderWin() { fpAlgo = new CbmMcbm2019TimeWinEventBuilderAlgo(); }

Pierre-Alain Loizeau
committed
void CbmDeviceMcbmEventBuilderWin::InitTask()
try {
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/// Read options from executable
LOG(info) << "Init options for CbmDeviceMcbmEventBuilderWin.";
fbFillHistos = fConfig->GetValue<bool>("FillHistos");
fbIgnoreTsOverlap = fConfig->GetValue<bool>("IgnOverMs");
fsEvtOverMode = fConfig->GetValue<std::string>("EvtOverMode");
fsRefDet = fConfig->GetValue<std::string>("RefDet");
fvsAddDet = fConfig->GetValue<std::vector<std::string>>("AddDet");
fvsDelDet = fConfig->GetValue<std::vector<std::string>>("DelDet");
fvsSetTrigWin = fConfig->GetValue<std::vector<std::string>>("SetTrigWin");
fvsSetTrigMinNb = fConfig->GetValue<std::vector<std::string>>("SetTrigMinNb");
fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
fsChannelNameDataOutput = fConfig->GetValue<std::string>("EvtNameOut");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
fsAllowedChannels[0] = fsChannelNameDataInput;
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDeviceMcbmEventBuilderWin::HandleData);
} // if( entry.first.find( "ts" )
} // for( auto const &entry : fChannels )
// InitContainers();
/// FIXME: Disable clang formatting for now as it corrupts all alignment
/* clang-format off */
/// Initialize the Algorithm parameters
fpAlgo->SetFillHistos(fbFillHistos);
fpAlgo->SetIgnoreTsOverlap(fbIgnoreTsOverlap);
/// Extract Event Overlap Mode
EOverlapMode mode =
("NoOverlap" == fsEvtOverMode ? EOverlapMode::NoOverlap
: ("MergeOverlap" == fsEvtOverMode ? EOverlapMode::MergeOverlap
: ("AllowOverlap" == fsEvtOverMode ? EOverlapMode::AllowOverlap
: EOverlapMode::NoOverlap)));
fpAlgo->SetEventOverlapMode(mode);
/// Extract refdet
EventBuilderDetector refDet = ("kBmon" == fsRefDet ? kEventBuilderDetBmon
: ("kSts" == fsRefDet ? kEventBuilderDetSts
: ("kMuch" == fsRefDet ? kEventBuilderDetMuch
: ("kTrd" == fsRefDet ? kEventBuilderDetTrd
: ("kTof" == fsRefDet ? kEventBuilderDetTof
: ("kRich" == fsRefDet ? kEventBuilderDetRich
: ("kPsd" == fsRefDet ? kEventBuilderDetPsd
: kEventBuilderDetUndef)))))));
if (kEventBuilderDetUndef != refDet) {
fpAlgo->SetReferenceDetector(refDet);
} // if( kEventBuilderDetUndef != refDet )
else {
LOG(info) << "CbmDeviceMcbmEventBuilderWin::InitTask => Trying to change "
"reference to unsupported detector, ignored! "
<< fsRefDet;
} // else of if( kEventBuilderDetUndef != refDet
/// Extract detector to add if any
for (std::vector<std::string>::iterator itStrAdd = fvsAddDet.begin();
itStrAdd != fvsAddDet.end();
++itStrAdd) {
EventBuilderDetector addDet = ("kBmon" == *itStrAdd ? kEventBuilderDetBmon
: ("kSts" == *itStrAdd ? kEventBuilderDetSts
: ("kMuch" == *itStrAdd ? kEventBuilderDetMuch
: ("kTrd" == *itStrAdd ? kEventBuilderDetTrd
: ("kTof" == *itStrAdd ? kEventBuilderDetTof
: ("kRich" == *itStrAdd ? kEventBuilderDetRich
: ("kPsd" == *itStrAdd ? kEventBuilderDetPsd
: kEventBuilderDetUndef)))))));
if (kEventBuilderDetUndef != addDet) {
} // if( kEventBuilderDetUndef != addDet )
else {
LOG(info) << "CbmDeviceMcbmEventBuilderWin::InitTask => Trying to add "
"unsupported detector, ignored! "
<< (*itStrAdd);
continue;
} // else of if( kEventBuilderDetUndef != addDet )
} // for( std::vector< std::string >::iterator itStrAdd = fvsAddDet.begin(); itStrAdd != fvsAddDet.end(); ++itStrAdd )
/// Extract detector to remove if any
for (std::vector<std::string>::iterator itStrRem = fvsDelDet.begin();
itStrRem != fvsDelDet.end();
++itStrRem) {
EventBuilderDetector remDet = ("kBmon" == *itStrRem ? kEventBuilderDetBmon
: ("kSts" == *itStrRem ? kEventBuilderDetSts
: ("kMuch" == *itStrRem ? kEventBuilderDetMuch
: ("kTrd" == *itStrRem ? kEventBuilderDetTrd
: ("kTof" == *itStrRem ? kEventBuilderDetTof
: ("kRich" == *itStrRem ? kEventBuilderDetRich
: ("kPsd" == *itStrRem ? kEventBuilderDetPsd
: kEventBuilderDetUndef)))))));
if (kEventBuilderDetUndef != remDet) {
} // if( kEventBuilderDetUndef != remDet )
else {
LOG(info) << "CbmDeviceMcbmEventBuilderWin::InitTask => Trying to remove "
"unsupported detector, ignored! "
<< (*itStrRem);
continue;
} // else of if( kEventBuilderDetUndef != remDet )
} // for( std::vector< std::string >::iterator itStrAdd = fvsAddDet.begin(); itStrAdd != fvsAddDet.end(); ++itStrAdd )
/// Extract Trigger window to add if any
for (std::vector<std::string>::iterator itStrTrigWin = fvsSetTrigWin.begin();
itStrTrigWin != fvsSetTrigWin.end();
++itStrTrigWin) {
size_t charPosDel = (*itStrTrigWin).find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceMcbmEventBuilderWin::InitTask => "
<< "Trying to set trigger window with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found "
<< (*itStrTrigWin) << " )";
continue;
} // if( std::string::npos == charPosDel )
/// Detector Enum Tag
std::string sSelDet = (*itStrTrigWin).substr(0, charPosDel);
ECbmModuleId selDet = ("kBmon" == sSelDet ? ECbmModuleId::kBmon
: ("kSts" == sSelDet ? ECbmModuleId::kSts
: ("kMuch" == sSelDet ? ECbmModuleId::kMuch
: ("kTrd" == sSelDet ? ECbmModuleId::kTrd
: ("kTof" == sSelDet ? ECbmModuleId::kTof
: ("kRich" == sSelDet ? ECbmModuleId::kRich
: ("kPsd" == sSelDet ? ECbmModuleId::kPsd
: ECbmModuleId::kNotExist)))))));
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
if (ECbmModuleId::kNotExist == selDet) {
LOG(info)
<< "CbmDeviceMcbmEventBuilderWin::InitTask => "
<< "Trying to set trigger window for unsupported detector, ignored! "
<< sSelDet;
continue;
} // if( ECbmModuleId::kNotExist == selDet )
/// Window beginning
charPosDel++;
std::string sNext = (*itStrTrigWin).substr(charPosDel);
charPosDel = sNext.find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceMcbmEventBuilderWin::InitTask => "
<< "Trying to set trigger window with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found "
<< (*itStrTrigWin) << " )";
continue;
} // if( std::string::npos == charPosDel )
Double_t dWinBeg = std::stod(sNext.substr(0, charPosDel));
/// Window end
charPosDel++;
Double_t dWinEnd = std::stod(sNext.substr(charPosDel));
fpAlgo->SetTriggerWindow(selDet, dWinBeg, dWinEnd);
} // for( std::vector< std::string >::iterator itStrTrigWin = fvsSetTrigWin.begin(); itStrTrigWin != fvsSetTrigWin.end(); ++itStrTrigWin )
/// Extract MinNb for trigger if any
for (std::vector<std::string>::iterator itStrMinNb = fvsSetTrigMinNb.begin();
itStrMinNb != fvsSetTrigMinNb.end();
++itStrMinNb) {
size_t charPosDel = (*itStrMinNb).find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceMcbmEventBuilderWin::InitTask => "
<< "Trying to set trigger min Nb with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,uMinNb but instead found " << (*itStrMinNb)
<< " )";
continue;
} // if( std::string::npos == charPosDel )
/// Detector Enum Tag
std::string sSelDet = (*itStrMinNb).substr(0, charPosDel);
ECbmModuleId selDet = ("kBmon" == sSelDet ? ECbmModuleId::kBmon
: ("kSts" == sSelDet ? ECbmModuleId::kSts
: ("kMuch" == sSelDet ? ECbmModuleId::kMuch
: ("kTrd" == sSelDet ? ECbmModuleId::kTrd
: ("kTof" == sSelDet ? ECbmModuleId::kTof
: ("kRich" == sSelDet ? ECbmModuleId::kRich
: ("kPsd" == sSelDet ? ECbmModuleId::kPsd
: ECbmModuleId::kNotExist)))))));
if (ECbmModuleId::kNotExist == selDet) {
LOG(info)
<< "CbmDeviceMcbmEventBuilderWin::InitTask => "
<< "Trying to set trigger min Nb for unsupported detector, ignored! "
<< sSelDet;
continue;
} // if( ECbmModuleId::kNotExist == selDet )
/// Min number
charPosDel++;
UInt_t uMinNb = std::stoul((*itStrMinNb).substr(charPosDel));
fpAlgo->SetTriggerMinNumber(selDet, uMinNb);
} // for( std::vector< std::string >::iterator itStrMinNb = fvsSetTrigMinNb.begin(); itStrMinNb != fvsSetTrigMinNb.end(); ++itStrMinNb )
/// FIXME: Re-enable clang formatting after formatted lines
/* clang-format on */
fvDigiSts = new std::vector<CbmStsDigi>();
fvDigiMuch = new std::vector<CbmMuchBeamTimeDigi>();
fvDigiTrd = new std::vector<CbmTrdDigi>();
fvDigiTof = new std::vector<CbmTofDigi>();
fvDigiRich = new std::vector<CbmRichDigi>();
fvDigiPsd = new std::vector<CbmPsdDigi>();
/// Register all input data members with the FairRoot manager
fpRun = new FairRunOnline(0);
FairRootManager* ioman = nullptr;
ioman = FairRootManager::Instance();
if (NULL == ioman) { throw InitTaskError("No FairRootManager instance"); }
fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
if (NULL == fTimeSliceMetaDataArray) { throw InitTaskError("Failed creating the TS meta data TClonesarray "); }
ioman->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kFALSE);
ioman->RegisterAny("BmonDigi", fvDigiBmon, kFALSE);
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
ioman->RegisterAny("StsDigi", fvDigiSts, kFALSE);
ioman->RegisterAny("MuchBeamTimeDigi", fvDigiMuch, kFALSE);
ioman->RegisterAny("TrdDigi", fvDigiTrd, kFALSE);
ioman->RegisterAny("TofDigi", fvDigiTof, kFALSE);
ioman->RegisterAny("RichDigi", fvDigiRich, kFALSE);
ioman->RegisterAny("PsdDigi", fvDigiPsd, kFALSE);
/// Feint to avoid crash of DigiManager due to missing source pointer
/// validity check in FairRootManager.h at line 461
std::vector<CbmMvdDigi>* pMvdDigi = new std::vector<CbmMvdDigi>();
ioman->RegisterAny("MvdDigi", pMvdDigi, kFALSE);
std::vector<CbmMatch>* pFakeMatch = new std::vector<CbmMatch>();
ioman->RegisterAny("MvdDigiMatch", pFakeMatch, kFALSE);
ioman->RegisterAny("StsDigiMatch", pFakeMatch, kFALSE);
ioman->RegisterAny("MuchBeamTimeDigiMatch", pFakeMatch, kFALSE);
ioman->RegisterAny("TrdDigiMatch", pFakeMatch, kFALSE);
ioman->RegisterAny("TofDigiMatch", pFakeMatch, kFALSE);
ioman->RegisterAny("RichDigiMatch", pFakeMatch, kFALSE);
ioman->RegisterAny("PsdDigiMatch", pFakeMatch, kFALSE);
/// Create output TClonesArray
/// TODO: remove TObject from CbmEvent and switch to vectors!
fEvents = new TClonesArray("CbmEvent", 500);
/// Now that everything is set, initialize the Algorithm
if (kFALSE == fpAlgo->InitAlgo()) {
throw InitTaskError("Failed to initilize the algorithm class.");
} // if( kFALSE == fpAlgo->InitAlgo() )
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fpAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector<std::pair<TCanvas*, std::string>> vCanvases = fpAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add(vHistos[uHisto].first);
std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
fvpsHistosFolder.push_back(psHistoConfig);
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, psHistoConfig);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig);
/// Send message to the common histogram config messages queue
if (Send(messageHist, fsChannelNameHistosConfig) < 0) {
throw InitTaskError("Problem sending histo config");
} // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[uCanv].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
fvpsCanvasConfig.push_back(psCanvConfig);
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, psCanvConfig);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig);
/// Send message to the common canvas config messages queue
if (Send(messageCan, fsChannelNameCanvasConfig) < 0) {
throw InitTaskError("Problem sending canvas config");
} // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
} // if( kTRUE == fbFillHistos )
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);

Pierre-Alain Loizeau
committed
}
bool CbmDeviceMcbmEventBuilderWin::IsChannelNameAllowed(std::string channelName)
{
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";

Pierre-Alain Loizeau
committed
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
}
/*
Bool_t CbmDeviceMcbmEventBuilderWin::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceMcbmEventBuilderWin.";
if( kFALSE == InitParameters( fpAlgo ->GetParList() ) )
return kFALSE;
/// Need to add accessors for all options
fpAlgo ->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
Bool_t initOK = fpAlgo->InitContainers();
// Bool_t initOK = fMonitorAlgo->ReInitContainers();
return initOK;
}
Bool_t CbmDeviceMcbmEventBuilderWin::InitParameters( TList* fParCList )
{
for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
{
FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
fParCList->Remove( tempObj );
std::string paramName{ tempObj->GetName() };
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req( NewSimpleMessage(message) );
FairMQMessagePtr rep( NewMessage() );
FairParGenericSet* newObj = nullptr;
if( Send(req, "parameters") > 0 )
{
if( Receive( rep, "parameters" ) >= 0)
{
if( 0 != rep->GetSize() )
{
CbmMqTMessage tmsg( rep->GetData(), rep->GetSize() );

Pierre-Alain Loizeau
committed
newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
LOG( info ) << "Received unpack parameter from the server:";
newObj->print();
} // if( 0 != rep->GetSize() )
else
{
LOG( error ) << "Received empty reply. Parameter not available";
return kFALSE;
} // else of if( 0 != rep->GetSize() )
} // if( Receive( rep, "parameters" ) >= 0)
} // if( Send(req, "parameters") > 0 )
fParCList->AddAt( newObj, iparC );
delete tempObj;
} // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
return kTRUE;
}
*/
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmEventBuilderWin::HandleData(FairMQParts& parts, int /*index*/)
{
LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts"
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
/// Extract unpacked data from input message
uint32_t uPartIdx = 0;
/// TS metadata
/// TODO: code order of vectors in the TS MetaData!!
/*

Pierre-Alain Loizeau
committed
std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
( parts.At( uPartIdx ) )->GetSize() );
std::istringstream issTsMeta(msgStrTsMeta);
boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
inputArchiveTsMeta >> (*fTsMetaData);
++uPartIdx;
*/
// Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
RootSerializer().Deserialize(*parts.At(uPartIdx), fTsMetaData);
/// FIXME: Not if this is the proper way to insert the data
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()
// ] ) TimesliceMetaData( *fTsMetaData ) ;
]) TimesliceMetaData(std::move(*fTsMetaData));
++uPartIdx;

Pierre-Alain Loizeau
committed
std::string msgStrBmon(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issBmon(msgStrBmon);
boost::archive::binary_iarchive inputArchiveBmon(issBmon);
inputArchiveBmon >> *fvDigiBmon;

Pierre-Alain Loizeau
committed
std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issSts(msgStrSts);
boost::archive::binary_iarchive inputArchiveSts(issSts);
inputArchiveSts >> *fvDigiSts;
++uPartIdx;

Pierre-Alain Loizeau
committed
std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issMuch(msgStrMuch);
boost::archive::binary_iarchive inputArchiveMuch(issMuch);
inputArchiveMuch >> *fvDigiMuch;
++uPartIdx;

Pierre-Alain Loizeau
committed
std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issTrd(msgStrTrd);
boost::archive::binary_iarchive inputArchiveTrd(issTrd);
inputArchiveTrd >> *fvDigiTrd;
++uPartIdx;
std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issTof(msgStrTof);
boost::archive::binary_iarchive inputArchiveTof(issTof);
inputArchiveTof >> *fvDigiTof;
++uPartIdx;
/// RICH
std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issRich(msgStrRich);
boost::archive::binary_iarchive inputArchiveRich(issRich);
inputArchiveRich >> *fvDigiRich;
++uPartIdx;
/// PSD
std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issPsd(msgStrPsd);
boost::archive::binary_iarchive inputArchivePsd(issPsd);
inputArchivePsd >> *fvDigiPsd;
++uPartIdx;

Pierre-Alain Loizeau
committed
/// Call Algo ProcessTs method
fpAlgo->ProcessTs();
/// Send events vector to ouput
if (!SendEvents(parts)) return false;
/// Clear metadata
fTimeSliceMetaDataArray->Clear();
// delete fTsMetaData;
/// Clear vectors
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();
/// Clear event vector after usage
fpAlgo->ClearEventVector();
fEvents->Clear("C");
// fEvents->Clear();
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
} // if( kTRUE == fbFillHistos )
return true;

Pierre-Alain Loizeau
committed
}
bool CbmDeviceMcbmEventBuilderWin::SendEvents(FairMQParts& partsIn)
{
/// Clear events TClonesArray before usage.
fEvents->Delete();
// fEvents->Clear();
/// Get vector reference from algo
std::vector<CbmEvent*> vEvents = fpAlgo->GetEventVector();
/// Move CbmEvent from temporary vector to TClonesArray
for (CbmEvent* event : vEvents) {
LOG(debug) << "Vector: " << event->ToString();
new ((*fEvents)[fEvents->GetEntriesFast()]) CbmEvent(std::move(*event));
// new ( (*fEvents)[fEvents->GetEntriesFast()] ) CbmEvent( *event );
LOG(debug) << "TClonesArray: " << static_cast<CbmEvent*>(fEvents->At(fEvents->GetEntriesFast() - 1))->ToString();
} // for( CbmEvent* event: vEvents )
/// Serialize the array of events into a single MQ message
FairMQMessagePtr message(NewMessage());
// Serialize<RootSerializer>(*message, fEvents);
RootSerializer().Serialize(*message, fEvents);
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
/// Add it at the end of the input composed message
/// FIXME: use move or fix addition of new part to avoid full message copy
FairMQParts partsOut(std::move(partsIn));
partsOut.AddPart(std::move(message));
// /// Get vector from algo
// fEventVector = fpAlgo->GetEventVector();
//
// /// Prepare serialized versions of the events vector
// std::stringstream ossEvents;
// boost::archive::binary_oarchive oaEvents(ossEvents);
// oaEvents << fpAlgo->GetEventVector();
// std::string* strMsgEvents = new std::string(ossEvents.str());
//
// /// Create message
// FairMQMessagePtr msg( NewMessage( const_cast< char * >( strMsgEvents->c_str() ), // data
// strMsgEvents->length(), // size
// []( void * /*data*/, void* object ){ delete static_cast< std::string * >( object ); },
// strMsgEvents ) ); // object that manages the data
/// Send message
// if( Send( message, fsChannelNameDataOutput ) < 0 )
if (Send(partsOut, fsChannelNameDataOutput) < 0) {
LOG(error) << "Problem sending data to " << fsChannelNameDataOutput;
return false;
}
return true;
}

Pierre-Alain Loizeau
committed
bool CbmDeviceMcbmEventBuilderWin::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )

Pierre-Alain Loizeau
committed
/// Reset the histograms after sending them (but do not reset the time)
fpAlgo->ResetHistograms(kFALSE);

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
}
CbmDeviceMcbmEventBuilderWin::~CbmDeviceMcbmEventBuilderWin()
{
/// Clear metadata
fTimeSliceMetaDataArray->Clear();
delete fTsMetaData;

Pierre-Alain Loizeau
committed
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
}