Newer
Older

Pierre-Alain Loizeau
committed
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau[committer] */
/**
* CbmDeviceBuildDigiEvents.cxx
*
* @since 2021-11-18
* @author P.-A. Loizeau
*/
#include "CbmDeviceBuildDigiEvents.h"
/// CBM headers

Pierre-Alain Loizeau
committed
#include "CbmDigiEvent.h"

Pierre-Alain Loizeau
committed
#include "CbmEvent.h"
#include "CbmFlesCanvasTools.h"
#include "CbmMQDefs.h"
#include "CbmMatch.h"
#include "CbmMvdDigi.h"
#include "CbmTsEventHeader.h"

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include "TimesliceMetaData.h"
/// FAIRROOT headers
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "FairRunOnline.h"
#include "BoostSerializer.h"
#include "RootSerializer.h"
/// FAIRSOFT headers (geant, boost, ...)
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>
/// C/C++ headers
#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
using namespace std;
CbmDeviceBuildDigiEvents::CbmDeviceBuildDigiEvents() { fpAlgo = new CbmAlgoBuildRawEvents(); }
void CbmDeviceBuildDigiEvents::InitTask()
try {
/// Read options from executable
LOG(info) << "Init options for CbmDeviceBuildDigiEvents.";
fbFillHistos = fConfig->GetValue<bool>("FillHistos");
fbIgnoreTsOverlap = fConfig->GetValue<bool>("IgnOverMs");

Pierre-Alain Loizeau
committed
fsEvtOverMode = fConfig->GetValue<std::string>("EvtOverMode");
fsRefDet = fConfig->GetValue<std::string>("RefDet");
fvsAddDet = fConfig->GetValue<std::vector<std::string>>("AddDet");
fvsDelDet = fConfig->GetValue<std::vector<std::string>>("DelDet");
fvsSetTrigWin = fConfig->GetValue<std::vector<std::string>>("SetTrigWin");
fvsSetTrigMinNb = fConfig->GetValue<std::vector<std::string>>("SetTrigMinNb");
fvsSetTrigMaxNb = fConfig->GetValue<std::vector<std::string>>("SetTrigMaxNb");
fvsSetTrigMinLayersNb = fConfig->GetValue<std::vector<std::string>>("SetTrigMinLayersNb");

Pierre-Alain Loizeau
committed
fvsSetHistMaxDigiNb = fConfig->GetValue<std::vector<std::string>>("SetHistMaxDigiNb");

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
fbDoNotSend = fConfig->GetValue<bool>("DoNotSend");

Pierre-Alain Loizeau
committed
fbDigiEventOutput = fConfig->GetValue<bool>("DigiEventOutput");
fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
fsChannelNameDataOutput = fConfig->GetValue<std::string>("EvtNameOut");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsAllowedChannels[0] = fsChannelNameDataInput;

Pierre-Alain Loizeau
committed
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDeviceBuildDigiEvents::HandleData);
}
}
/// FIXME: Disable clang formatting for now as it corrupts all alignment
/* clang-format off */
/// Initialize the Algorithm parameters
fpAlgo->SetFillHistos(fbFillHistos);
fpAlgo->SetIgnoreTsOverlap(fbIgnoreTsOverlap);
/// Extract Event Overlap Mode

Pierre-Alain Loizeau
committed
EOverlapModeRaw mode = ("NoOverlap" == fsEvtOverMode ? EOverlapModeRaw::NoOverlap
: ("MergeOverlap" == fsEvtOverMode ? EOverlapModeRaw::MergeOverlap
: ("AllowOverlap" == fsEvtOverMode ? EOverlapModeRaw::AllowOverlap
: EOverlapModeRaw::NoOverlap)));

Pierre-Alain Loizeau
committed
fpAlgo->SetEventOverlapMode(mode);
/// Extract refdet

Pierre-Alain Loizeau
committed
RawEventBuilderDetector refDet = GetDetectorBuilderCfg(fsRefDet);

Pierre-Alain Loizeau
committed
if (kRawEventBuilderDetUndef != refDet) {
fpAlgo->SetReferenceDetector(refDet);
}
else {
LOG(info) << "CbmDeviceBuildDigiEvents::InitTask => Trying to change "
"reference to unsupported detector, ignored! "
<< fsRefDet;
}
/// Extract detector to add if any
for (std::vector<std::string>::iterator itStrAdd = fvsAddDet.begin();
itStrAdd != fvsAddDet.end();
++itStrAdd) {

Pierre-Alain Loizeau
committed
RawEventBuilderDetector addDet = GetDetectorBuilderCfg(*itStrAdd);

Pierre-Alain Loizeau
committed
if (kRawEventBuilderDetUndef != addDet) {
fpAlgo->AddDetector(addDet);
}
else {
LOG(info) << "CbmDeviceBuildDigiEvents::InitTask => Trying to add "
"unsupported detector, ignored! "
<< (*itStrAdd);
continue;
}
}

Pierre-Alain Loizeau
committed
/// Extract detector to remove if any

Pierre-Alain Loizeau
committed
for (std::vector<std::string>::iterator itStrRem = fvsDelDet.begin();
itStrRem != fvsDelDet.end();
++itStrRem) {

Pierre-Alain Loizeau
committed
RawEventBuilderDetector remDet = GetDetectorBuilderCfg(*itStrRem);

Pierre-Alain Loizeau
committed
if (kRawEventBuilderDetUndef != remDet) {
fpAlgo->RemoveDetector(remDet);
}
else {
LOG(info) << "CbmDeviceBuildDigiEvents::InitTask => Trying to remove "
"unsupported detector, ignored! "
<< (*itStrRem);
continue;
}
}

Pierre-Alain Loizeau
committed
/// Extract Trigger window to add if any

Pierre-Alain Loizeau
committed
for (std::vector<std::string>::iterator itStrTrigWin = fvsSetTrigWin.begin();
itStrTrigWin != fvsSetTrigWin.end();
++itStrTrigWin) {
size_t charPosDel = (*itStrTrigWin).find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger window with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found "
<< (*itStrTrigWin) << " )";
continue;
}
/// Detector Enum Tag
std::string sSelDet = (*itStrTrigWin).substr(0, charPosDel);

Pierre-Alain Loizeau
committed
ECbmModuleId selDet = GetDetectorId(sSelDet);

Pierre-Alain Loizeau
committed
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
if (ECbmModuleId::kNotExist == selDet) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger window for unsupported detector, ignored! "
<< sSelDet;
continue;
}
/// Window beginning
charPosDel++;
std::string sNext = (*itStrTrigWin).substr(charPosDel);
charPosDel = sNext.find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger window with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found "
<< (*itStrTrigWin) << " )";
continue;
}
Double_t dWinBeg = std::stod(sNext.substr(0, charPosDel));
/// Window end
charPosDel++;
Double_t dWinEnd = std::stod(sNext.substr(charPosDel));
fpAlgo->SetTriggerWindow(selDet, dWinBeg, dWinEnd);
}

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
/// Extract MinNb for trigger if any

Pierre-Alain Loizeau
committed
for (std::vector<std::string>::iterator itStrMinNb = fvsSetTrigMinNb.begin();
itStrMinNb != fvsSetTrigMinNb.end();
++itStrMinNb) {
size_t charPosDel = (*itStrMinNb).find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger min Nb with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,uMinNb but instead found " << (*itStrMinNb)
<< " )";
continue;
}
/// Detector Enum Tag
std::string sSelDet = (*itStrMinNb).substr(0, charPosDel);

Pierre-Alain Loizeau
committed
ECbmModuleId selDet = GetDetectorId(sSelDet);

Pierre-Alain Loizeau
committed
if (ECbmModuleId::kNotExist == selDet) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger min Nb for unsupported detector, ignored! "
<< sSelDet;
continue;
}
/// Min number
charPosDel++;
UInt_t uMinNb = std::stoul((*itStrMinNb).substr(charPosDel));
fpAlgo->SetTriggerMinNumber(selDet, uMinNb);
}

Pierre-Alain Loizeau
committed
/// Extract MaxNb for trigger if any
for (std::vector<std::string>::iterator itStrMaxNb = fvsSetTrigMaxNb.begin();
itStrMaxNb != fvsSetTrigMaxNb.end();
++itStrMaxNb) {
size_t charPosDel = (*itStrMaxNb).find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger Max Nb with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,uMaxNb but instead found " << (*itStrMaxNb)
<< " )";
continue;
}
/// Detector Enum Tag
std::string sSelDet = (*itStrMaxNb).substr(0, charPosDel);

Pierre-Alain Loizeau
committed
ECbmModuleId selDet = GetDetectorId(sSelDet);

Pierre-Alain Loizeau
committed
if (ECbmModuleId::kNotExist == selDet) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger Max Nb for unsupported detector, ignored! "
<< sSelDet;
continue;
}
/// Max number
charPosDel++;

Pierre-Alain Loizeau
committed
Int_t iMaxNb = std::stol((*itStrMaxNb).substr(charPosDel));

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
fpAlgo->SetTriggerMaxNumber(selDet, iMaxNb);

Pierre-Alain Loizeau
committed
}

Pierre-Alain Loizeau
committed
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/// Extract MinLayersNb for trigger if any
for (std::vector<std::string>::iterator itStrMinLayersNb = fvsSetTrigMinLayersNb.begin();
itStrMinLayersNb != fvsSetTrigMinLayersNb.end();
++itStrMinLayersNb) {
size_t charPosDel = (*itStrMinLayersNb).find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger min layers Nb with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,uMinLayersNb but instead found " << (*itStrMinLayersNb)
<< " )";
continue;
}
/// Detector Enum Tag
std::string sSelDet = (*itStrMinLayersNb).substr(0, charPosDel);
ECbmModuleId selDet = GetDetectorId(sSelDet);
if (ECbmModuleId::kNotExist == selDet) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set trigger min layers Nb for unsupported detector, ignored! "
<< sSelDet;
continue;
}
/// Min number
charPosDel++;
UInt_t uMinLayersNb = std::stoul((*itStrMinLayersNb).substr(charPosDel));
fpAlgo->SetTriggerMinLayersNumber(selDet, uMinLayersNb);
}

Pierre-Alain Loizeau
committed
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
/// Extract Histograms Max Digi limits if any
for (std::vector<std::string>::iterator itStrHistMaxDigi = fvsSetHistMaxDigiNb.begin();
itStrHistMaxDigi != fvsSetHistMaxDigiNb.end();
++itStrHistMaxDigi) {
size_t charPosDel = (*itStrHistMaxDigi).find(',');
if (std::string::npos == charPosDel) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set Histos max Digi nb with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,dMaxDigiNb but instead found " << (*itStrHistMaxDigi)
<< " )";
continue;
}
/// Detector Enum Tag
std::string sSelDet = (*itStrHistMaxDigi).substr(0, charPosDel);
ECbmModuleId selDet = GetDetectorId(sSelDet);
if (ECbmModuleId::kNotExist == selDet) {
LOG(info)
<< "CbmDeviceBuildDigiEvents::InitTask => "
<< "Trying to set Histos max Digi nb for unsupported detector, ignored! "
<< sSelDet;
continue;
}
/// Min number
charPosDel++;
Double_t dHistMaxDigiNb = std::stod((*itStrHistMaxDigi).substr(charPosDel));
LOG(debug) << "set Histos max Digi nb to " << dHistMaxDigiNb;
fpAlgo->SetHistogramMaxDigiNb(selDet, dHistMaxDigiNb);
}

Pierre-Alain Loizeau
committed
/// FIXME: Re-enable clang formatting after formatted lines
/* clang-format on */
/// Create input vectors
fvDigiBmon = new std::vector<CbmBmonDigi>(1000000);
fvDigiSts = new std::vector<CbmStsDigi>(1000000);
fvDigiMuch = new std::vector<CbmMuchDigi>(1000000);
fvDigiTrd = new std::vector<CbmTrdDigi>(1000000);
fvDigiTof = new std::vector<CbmTofDigi>(1000000);
fvDigiRich = new std::vector<CbmRichDigi>(1000000);
fvDigiPsd = new std::vector<CbmPsdDigi>(1000000);

Pierre-Alain Loizeau
committed
fCbmTsEventHeader = new CbmTsEventHeader();
/// Digis storage
fpAlgo->SetDigis(fvDigiSts);
fpAlgo->SetDigis(fvDigiMuch);
fpAlgo->SetDigis(fvDigiTrd);
fpAlgo->SetDigis(fvDigiTof);
fpAlgo->SetDigis(fvDigiRich);
fpAlgo->SetDigis(fvDigiPsd);

Pierre-Alain Loizeau
committed
// Mvd currently not implemented in event builder
//std::vector<CbmMvdDigi>* pMvdDigi = new std::vector<CbmMvdDigi>();
fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
if (NULL == fTimeSliceMetaDataArray) { throw InitTaskError("Failed creating the TS meta data TClonesarray "); }
fpAlgo->SetTimeSliceMetaDataArray(fTimeSliceMetaDataArray);
/// Now that everything is set, initialize the Algorithm
if (kFALSE == fpAlgo->InitAlgo()) { throw InitTaskError("Failed to initialize the algorithm class."); }

Pierre-Alain Loizeau
committed
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Comment to prevent clang format single lining
if (kFALSE == InitHistograms()) { throw InitTaskError("Failed to initialize the histograms."); }

Pierre-Alain Loizeau
committed
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
}
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmDeviceBuildDigiEvents::IsChannelNameAllowed(std::string channelName)
{
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
}
}
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}

Pierre-Alain Loizeau
committed
RawEventBuilderDetector CbmDeviceBuildDigiEvents::GetDetectorBuilderCfg(std::string detName)
{
/// FIXME: Disable clang formatting for now as it corrupts all alignment
/* clang-format off */
RawEventBuilderDetector cfgDet = ("kBmon" == detName ? kRawEventBuilderDetBmon

Pierre-Alain Loizeau
committed
: ("kSts" == detName ? kRawEventBuilderDetSts
: ("kMuch" == detName ? kRawEventBuilderDetMuch
: ("kTrd" == detName ? kRawEventBuilderDetTrd
: ("kTrd2D" == detName ? kRawEventBuilderDetTrd2D
: ("kTof" == detName ? kRawEventBuilderDetTof
: ("kRich" == detName ? kRawEventBuilderDetRich
: ("kPsd" == detName ? kRawEventBuilderDetPsd
: kRawEventBuilderDetUndef))))))));
return cfgDet;
/// FIXME: Re-enable clang formatting after formatted lines
/* clang-format on */
}
ECbmModuleId CbmDeviceBuildDigiEvents::GetDetectorId(std::string detName)
{
/// FIXME: Disable clang formatting for now as it corrupts all alignment
/* clang-format off */
ECbmModuleId detId = ("kBmon" == detName ? ECbmModuleId::kBmon

Pierre-Alain Loizeau
committed
: ("kSts" == detName ? ECbmModuleId::kSts
: ("kMuch" == detName ? ECbmModuleId::kMuch
: ("kTrd" == detName ? ECbmModuleId::kTrd
: ("kTrd2D" == detName ? ECbmModuleId::kTrd2d
: ("kTof" == detName ? ECbmModuleId::kTof
: ("kRich" == detName ? ECbmModuleId::kRich
: ("kPsd" == detName ? ECbmModuleId::kPsd
: ECbmModuleId::kNotExist))))))));
return detId;
/// FIXME: Re-enable clang formatting after formatted lines
/* clang-format on */
}
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
bool CbmDeviceBuildDigiEvents::InitHistograms()
{
bool initOK = true;
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fpAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector<std::pair<TCanvas*, std::string>> vCanvases = fpAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add(vHistos[uHisto].first);
std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
fvpsHistosFolder.push_back(psHistoConfig);
LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[uCanv].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
fvpsCanvasConfig.push_back(psCanvConfig);
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return initOK;
}

Pierre-Alain Loizeau
committed
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceBuildDigiEvents::HandleData(FairMQParts& parts, int /*index*/)
{
fulNumMessages++;
LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts"
<< ", size0: " << parts.At(0)->GetSize();
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
/// Extract unpacked data from input message
uint32_t uPartIdx = 0;
/// TS header
// Deserialize<RootSerializer>(*parts.At(uPartIdx), fCbmTsEventHeader);
RootSerializer().Deserialize(*parts.At(uPartIdx), fCbmTsEventHeader);

Pierre-Alain Loizeau
committed
++uPartIdx;

Pierre-Alain Loizeau
committed
if (0 < (parts.At(uPartIdx))->GetSize()) {
std::string msgStrBmon(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issBmon(msgStrBmon);
boost::archive::binary_iarchive inputArchiveBmon(issBmon);
inputArchiveBmon >> *fvDigiBmon;

Pierre-Alain Loizeau
committed
}

Pierre-Alain Loizeau
committed
++uPartIdx;
/// STS

Pierre-Alain Loizeau
committed
if (0 < (parts.At(uPartIdx))->GetSize()) {
std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issSts(msgStrSts);
boost::archive::binary_iarchive inputArchiveSts(issSts);
inputArchiveSts >> *fvDigiSts;
}

Pierre-Alain Loizeau
committed
++uPartIdx;
/// MUCH

Pierre-Alain Loizeau
committed
if (0 < (parts.At(uPartIdx))->GetSize()) {
std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issMuch(msgStrMuch);
boost::archive::binary_iarchive inputArchiveMuch(issMuch);
inputArchiveMuch >> *fvDigiMuch;
}

Pierre-Alain Loizeau
committed
++uPartIdx;
/// TRD

Pierre-Alain Loizeau
committed
if (0 < (parts.At(uPartIdx))->GetSize()) {
std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issTrd(msgStrTrd);
boost::archive::binary_iarchive inputArchiveTrd(issTrd);
inputArchiveTrd >> *fvDigiTrd;
}

Pierre-Alain Loizeau
committed
++uPartIdx;

Pierre-Alain Loizeau
committed
if (0 < (parts.At(uPartIdx))->GetSize()) {
std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issTof(msgStrTof);
boost::archive::binary_iarchive inputArchiveTof(issTof);
inputArchiveTof >> *fvDigiTof;
}

Pierre-Alain Loizeau
committed
++uPartIdx;
/// RICH

Pierre-Alain Loizeau
committed
if (0 < (parts.At(uPartIdx))->GetSize()) {
std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issRich(msgStrRich);
boost::archive::binary_iarchive inputArchiveRich(issRich);
inputArchiveRich >> *fvDigiRich;
}

Pierre-Alain Loizeau
committed
++uPartIdx;
/// PSD

Pierre-Alain Loizeau
committed
if (0 < (parts.At(uPartIdx))->GetSize()) {
std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issPsd(msgStrPsd);
boost::archive::binary_iarchive inputArchivePsd(issPsd);
inputArchivePsd >> *fvDigiPsd;
}

Pierre-Alain Loizeau
committed
++uPartIdx;
/// TS metadata
// Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
RootSerializer().Deserialize(*parts.At(uPartIdx), fTsMetaData);

Pierre-Alain Loizeau
committed
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
TimesliceMetaData(std::move(*fTsMetaData));
++uPartIdx;
LOG(debug) << "Bmon Vector size: " << fvDigiBmon->size();

Pierre-Alain Loizeau
committed
LOG(debug) << "STS Vector size: " << fvDigiSts->size();
LOG(debug) << "MUCH Vector size: " << fvDigiMuch->size();
LOG(debug) << "TRD Vector size: " << fvDigiTrd->size();
LOG(debug) << "TOF Vector size: " << fvDigiTof->size();
LOG(debug) << "RICH Vector size: " << fvDigiRich->size();
LOG(debug) << "PSD Vector size: " << fvDigiPsd->size();
if (1 == fulNumMessages) {
/// First message received
fpAlgo->SetTsParameters(0, fTsMetaData->GetDuration(), fTsMetaData->GetOverlapDuration());
}

Pierre-Alain Loizeau
committed
/// Call Algo ProcessTs method
fpAlgo->ProcessTs();
/// Send events vector to ouput

Pierre-Alain Loizeau
committed
if (!fbDoNotSend) {
if (fbDigiEventOutput) {
if (!(SendDigiEvents(parts))) return false;
}
else {
if (!(SendEvents(parts))) return false;
}
}

Pierre-Alain Loizeau
committed
/// Clear metadata
fTimeSliceMetaDataArray->Clear();
/// Clear vectors

Pierre-Alain Loizeau
committed
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();
/// Clear event vector after usage
fpAlgo->ClearEventVector();
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
if (!fbConfigSent) {
// Send the configuration only once per run!
fbConfigSent = SendHistoConfAndData();
} // if( !fbConfigSent )
else
SendHistograms();

Pierre-Alain Loizeau
committed
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )

Pierre-Alain Loizeau
committed
}
return true;
}
bool CbmDeviceBuildDigiEvents::SendEvents(FairMQParts& partsIn)
{
/// Get vector reference from algo
std::vector<CbmEvent*> vEvents = fpAlgo->GetEventVector();

Pierre-Alain Loizeau
committed
/// Move CbmEvent from temporary vector to std::vector of full objects
LOG(debug) << "Vector size: " << vEvents.size();
std::vector<CbmEvent> vOutEvents;

Pierre-Alain Loizeau
committed
for (CbmEvent* event : vEvents) {

Pierre-Alain Loizeau
committed
LOG(debug) << "Vector ptr: " << event->ToString();
vOutEvents.push_back(std::move(*event));
LOG(debug) << "Vector obj: " << vOutEvents[(vOutEvents.size()) - 1].ToString();

Pierre-Alain Loizeau
committed
}
/// Serialize the array of events into a single MQ message

Pierre-Alain Loizeau
committed
/// FIXME: Find out if possible to use only the boost serializer

Pierre-Alain Loizeau
committed
FairMQMessagePtr message(NewMessage());
// Serialize<RootSerializer>(*message, &(vOutEvents));
RootSerializer().Serialize(*message, &(vOutEvents));

Pierre-Alain Loizeau
committed
/*
std::stringstream ossEvt;
boost::archive::binary_oarchive oaEvt(ossEvt);
oaEvt << vOutEvents;
std::string* strMsgEvt = new std::string(ossEvt.str());
*/

Pierre-Alain Loizeau
committed
/// Add it at the end of the input composed message

Pierre-Alain Loizeau
committed
/// FIXME: Find out if possible to use only the boost serializer

Pierre-Alain Loizeau
committed
FairMQParts partsOut(std::move(partsIn));
partsOut.AddPart(std::move(message));

Pierre-Alain Loizeau
committed
/*
partsOut.AddPart(NewMessage(
const_cast<char*>(strMsgEvt->c_str()), // data
strMsgEvt->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgEvt)); // object that manages the data
*/

Pierre-Alain Loizeau
committed
if (Send(partsOut, fsChannelNameDataOutput) < 0) {

Pierre-Alain Loizeau
committed
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
LOG(error) << "Problem sending data to " << fsChannelNameDataOutput;
return false;
}
vOutEvents.clear();
return true;
}
bool CbmDeviceBuildDigiEvents::SendDigiEvents(FairMQParts& partsIn)
{
/// Get vector reference from algo
std::vector<CbmEvent*> vEvents = fpAlgo->GetEventVector();
/// Move CbmEvent from temporary vector to std::vector of full objects
LOG(debug) << "In Vector size: " << vEvents.size();
std::vector<CbmDigiEvent> vOutEvents;
vOutEvents.reserve(vEvents.size());
for (CbmEvent* event : vEvents) {
CbmDigiEvent selEvent;
selEvent.fTime = event->GetStartTime();
selEvent.fNumber = event->GetNumber();
/// FIXME: for pure digi based event, we select "continuous slices of digis"
/// => Copy block of [First Digi index, last digi index] with assign(it_start, it_stop)
/// FIXME: Keep TRD1D + TRD2D support, may lead to holes in the digi sequence!
/// => Would need to keep the loop
/// Get the proper order for block selection as TRD1D and TRD2D may insert indices in separate loops
/// => Needed to ensure that the start and stop of the block copy do not trigger a vector size exception
event->SortIndices();

Pierre-Alain Loizeau
committed
/// for each detector, find the data in the Digi vectors and copy them

Pierre-Alain Loizeau
committed
/// TODO: Template + loop on list of data types?
/// ==> Bmon
uint32_t uNbDigis =
(0 < event->GetNofData(ECbmDataType::kBmonDigi) ? event->GetNofData(ECbmDataType::kBmonDigi) : 0);

Pierre-Alain Loizeau
committed
if (uNbDigis) {
auto startIt = fvDigiBmon->begin() + event->GetIndex(ECbmDataType::kBmonDigi, 0);
auto stopIt = fvDigiBmon->begin() + event->GetIndex(ECbmDataType::kBmonDigi, uNbDigis - 1);

Pierre-Alain Loizeau
committed
++stopIt;
selEvent.fData.fBmon.fDigis.assign(startIt, stopIt);

Pierre-Alain Loizeau
committed
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
}
/// ==> STS
uNbDigis = (0 < event->GetNofData(ECbmDataType::kStsDigi) ? event->GetNofData(ECbmDataType::kStsDigi) : 0);
if (uNbDigis) {
auto startIt = fvDigiSts->begin() + event->GetIndex(ECbmDataType::kStsDigi, 0);
auto stopIt = fvDigiSts->begin() + event->GetIndex(ECbmDataType::kStsDigi, uNbDigis - 1);
++stopIt;
selEvent.fData.fSts.fDigis.assign(startIt, stopIt);
}
/// ==> MUCH
uNbDigis = (0 < event->GetNofData(ECbmDataType::kMuchDigi) ? event->GetNofData(ECbmDataType::kMuchDigi) : 0);
if (uNbDigis) {
auto startIt = fvDigiMuch->begin() + event->GetIndex(ECbmDataType::kMuchDigi, 0);
auto stopIt = fvDigiMuch->begin() + event->GetIndex(ECbmDataType::kMuchDigi, uNbDigis - 1);
++stopIt;
selEvent.fData.fMuch.fDigis.assign(startIt, stopIt);
}
/// ==> TRD + TRD2D
uNbDigis = (0 < event->GetNofData(ECbmDataType::kTrdDigi) ? event->GetNofData(ECbmDataType::kTrdDigi) : 0);
if (uNbDigis) {
auto startIt = fvDigiTrd->begin() + event->GetIndex(ECbmDataType::kTrdDigi, 0);
auto stopIt = fvDigiTrd->begin() + event->GetIndex(ECbmDataType::kTrdDigi, uNbDigis - 1);
++stopIt;
selEvent.fData.fTrd.fDigis.assign(startIt, stopIt);
}
/// ==> TOF
uNbDigis = (0 < event->GetNofData(ECbmDataType::kTofDigi) ? event->GetNofData(ECbmDataType::kTofDigi) : 0);
if (uNbDigis) {
auto startIt = fvDigiTof->begin() + event->GetIndex(ECbmDataType::kTofDigi, 0);
auto stopIt = fvDigiTof->begin() + event->GetIndex(ECbmDataType::kTofDigi, uNbDigis - 1);
++stopIt;
selEvent.fData.fTof.fDigis.assign(startIt, stopIt);
}
/// ==> RICH
uNbDigis = (0 < event->GetNofData(ECbmDataType::kRichDigi) ? event->GetNofData(ECbmDataType::kRichDigi) : 0);
if (uNbDigis) {
auto startIt = fvDigiRich->begin() + event->GetIndex(ECbmDataType::kRichDigi, 0);
auto stopIt = fvDigiRich->begin() + event->GetIndex(ECbmDataType::kRichDigi, uNbDigis - 1);
++stopIt;
selEvent.fData.fRich.fDigis.assign(startIt, stopIt);
}
/// ==> PSD
uNbDigis = (0 < event->GetNofData(ECbmDataType::kPsdDigi) ? event->GetNofData(ECbmDataType::kPsdDigi) : 0);
if (uNbDigis) {
auto startIt = fvDigiPsd->begin() + event->GetIndex(ECbmDataType::kPsdDigi, 0);
auto stopIt = fvDigiPsd->begin() + event->GetIndex(ECbmDataType::kPsdDigi, uNbDigis - 1);
++stopIt;
selEvent.fData.fPsd.fDigis.assign(startIt, stopIt);
}
vOutEvents.push_back(std::move(selEvent));
}
LOG(debug) << "Out Vector size: " << vEvents.size();
/// Serialize the array of events into a single MQ message
std::stringstream ossEvt;
boost::archive::binary_oarchive oaEvt(ossEvt);
oaEvt << vOutEvents;
std::string* strMsgEvt = new std::string(ossEvt.str());

Pierre-Alain Loizeau
committed
FairMQMessagePtr message(NewMessage(
const_cast<char*>(strMsgEvt->c_str()), // data
strMsgEvt->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgEvt)); // object that manages the data

Pierre-Alain Loizeau
committed
LOG(debug) << "Serializing done";
/// Make a new composed messaged with TsHeader + vector of Digi Event + TsMetaData
/// FIXME: Find out if possible to use only the boost serializer
FairMQParts partsOut;
partsOut.AddPart(std::move(partsIn.At(0))); // TsHeader
partsOut.AddPart(std::move(partsIn.At(partsIn.Size() - 1))); // TsMetaData
partsOut.AddPart(std::move(message)); // DigiEvent vector
LOG(debug) << "Message preparation done";
if (Send(partsOut, fsChannelNameDataOutput) < 0) {

Pierre-Alain Loizeau
committed
LOG(error) << "Problem sending data to " << fsChannelNameDataOutput;
return false;
}

Pierre-Alain Loizeau
committed
vOutEvents.clear();

Pierre-Alain Loizeau
committed
return true;
}
bool CbmDeviceBuildDigiEvents::SendHistoConfAndData()
{
/// Prepare multiparts message and header
std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
FairMQMessagePtr messageHeader(NewMessage());
// Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
FairMQParts partsOut;
partsOut.AddPart(std::move(messageHeader));
for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
partsOut.AddPart(std::move(messageHist));
} // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
/// Catch case where no histos are registered!
/// => Add empty message
if (0 == fvpsHistosFolder.size()) {
FairMQMessagePtr messageHist(NewMessage());
partsOut.AddPart(std::move(messageHist));
}
for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
partsOut.AddPart(std::move(messageCan));
} // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
/// Catch case where no Canvases are registered!
/// => Add empty message
if (0 == fvpsCanvasConfig.size()) {
FairMQMessagePtr messageHist(NewMessage());
partsOut.AddPart(std::move(messageHist));
}
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr msgHistos(NewMessage());
// Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
RootSerializer().Serialize(*msgHistos, &fArrayHisto);
partsOut.AddPart(std::move(msgHistos));
/// Send the multi-parts message to the common histogram messages queue
if (Send(partsOut, fsChannelNameHistosInput) < 0) {
LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data";
return false;
} // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
fpAlgo->ResetHistograms(kFALSE);
return true;
}

Pierre-Alain Loizeau
committed
bool CbmDeviceBuildDigiEvents::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);

Pierre-Alain Loizeau
committed
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )

Pierre-Alain Loizeau
committed
/// Reset the histograms after sending them (but do not reset the time)
fpAlgo->ResetHistograms(kFALSE);
return true;
}
CbmDeviceBuildDigiEvents::~CbmDeviceBuildDigiEvents()
{
/// Clear metadata

Pierre-Alain Loizeau
committed
if (fCbmTsEventHeader) delete fCbmTsEventHeader;

Pierre-Alain Loizeau
committed
/// Clear vectors

Pierre-Alain Loizeau
committed
if (fvDigiSts) fvDigiSts->clear();
if (fvDigiMuch) fvDigiMuch->clear();
if (fvDigiTrd) fvDigiTrd->clear();
if (fvDigiTof) fvDigiTof->clear();
if (fvDigiRich) fvDigiRich->clear();
if (fvDigiPsd) fvDigiPsd->clear();

Pierre-Alain Loizeau
committed
/// Clear metadata

Pierre-Alain Loizeau
committed
if (fTimeSliceMetaDataArray) {
fTimeSliceMetaDataArray->Clear();
delete fTsMetaData;

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
delete fTimeSliceMetaDataArray;
}
if (fpAlgo) delete fpAlgo;

Pierre-Alain Loizeau
committed
}
void CbmDeviceBuildDigiEvents::Finish() {}