Newer
Older
/* Copyright (C) 2020-2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */

Pierre-Alain Loizeau
committed
/**
* CbmDeviceMcbmEventSink.cxx
*
* @since 2020-05-24
* @author P.-A. Loizeau
*/
#include "CbmDeviceMcbmEventSink.h"
/// CBM headers
#include "CbmEvent.h"
#include "CbmFlesCanvasTools.h"

Pierre-Alain Loizeau
committed
/// FAIRROOT headers
#include "FairMQLogger.h"

Pierre-Alain Loizeau
committed
#include "FairParGenericSet.h"
#include "FairRootFileSink.h"
#include "FairRootManager.h"
#include "FairRunOnline.h"
#include "BoostSerializer.h"

Pierre-Alain Loizeau
committed
#include "RootSerializer.h"
/// FAIRSOFT headers (geant, boost, ...)
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"

Pierre-Alain Loizeau
committed
#include <boost/archive/binary_iarchive.hpp>

Pierre-Alain Loizeau
committed
/// C/C++ headers

Pierre-Alain Loizeau
committed
#include <array>
#include <iomanip>

Pierre-Alain Loizeau
committed
#include <stdexcept>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};

Pierre-Alain Loizeau
committed
using namespace std;
//Bool_t bMcbm2018MonitorTaskBmonResetHistos = kFALSE;

Pierre-Alain Loizeau
committed
void CbmDeviceMcbmEventSink::InitTask()
try {
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/// Read options from executable
LOG(info) << "Init options for CbmDeviceMcbmEventSink.";
fsOutputFileName = fConfig->GetValue<std::string>("OutFileName");
fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn");
fsAllowedChannels[0] = fsChannelNameDataInput;
fbFillHistos = fConfig->GetValue<bool>("FillHistos");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
/// Associate the MissedTs Channel to the corresponding handler
OnData(fsChannelNameMissedTs, &CbmDeviceMcbmEventSink::HandleMissTsData);
/// Associate the command Channel to the corresponding handler
OnData(fsChannelNameCommands, &CbmDeviceMcbmEventSink::HandleCommand);
/// Associate the Event + Unp data Channel to the corresponding handler
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDeviceMcbmEventSink::HandleData);
} // if( entry.first.find( "ts" )
} // for( auto const &entry : fChannels )
// InitContainers();
/// Create input vectors
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
fvDigiSts = new std::vector<CbmStsDigi>();
fvDigiMuch = new std::vector<CbmMuchBeamTimeDigi>();
fvDigiTrd = new std::vector<CbmTrdDigi>();
fvDigiTof = new std::vector<CbmTofDigi>();
fvDigiRich = new std::vector<CbmRichDigi>();
fvDigiPsd = new std::vector<CbmPsdDigi>();
/// Prepare storage TClonesArrays
/// TS MetaData storage
fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
if (NULL == fTimeSliceMetaDataArray) {
throw InitTaskError("Failed creating the TS meta data TClonesarray ");
} // if( NULL == fTimeSliceMetaDataArray )
/// Events storage
/// TODO: remove TObject from CbmEvent and switch to vectors!
fEventsArray = new TClonesArray("CbmEvent", 500);
if (NULL == fEventsArray) {
throw InitTaskError("Failed creating the Events TClonesarray ");
} // if( NULL == fEventsArray )
/// Prepare root output
if ("" != fsOutputFileName) {
fpRun = new FairRunOnline();
fpFairRootMgr = FairRootManager::Instance();
fpFairRootMgr->SetSink(new FairRootFileSink(fsOutputFileName));
if (nullptr == fpFairRootMgr->GetOutFile()) {
throw InitTaskError("Could not open root file");
} // if( nullptr == fpFairRootMgr->GetOutFile() )
} // if( "" != fsOutputFileName )
else {
throw InitTaskError("Empty output filename!");
} // else of if( "" != fsOutputFileName )
LOG(info) << "Init Root Output to " << fsOutputFileName;
fpFairRootMgr->InitSink();
// fEvtHeader = new FairEventHeader();
// fEvtHeader->SetRunId(iRunId);
// rootMgr->Register("EventHeader.", "Event", fEvtHeader, kTRUE);
// rootMgr->FillEventHeader(fEvtHeader);
/// Register all input data members with the FairRoot manager
/// TS MetaData
fpFairRootMgr->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kTRUE);
fpFairRootMgr->RegisterAny("BmonDigi", fvDigiBmon, kTRUE);
fpFairRootMgr->RegisterAny("StsDigi", fvDigiSts, kTRUE);
fpFairRootMgr->RegisterAny("MuchBeamTimeDigi", fvDigiMuch, kTRUE);
fpFairRootMgr->RegisterAny("TrdDigi", fvDigiTrd, kTRUE);
fpFairRootMgr->RegisterAny("TofDigi", fvDigiTof, kTRUE);
fpFairRootMgr->RegisterAny("RichDigi", fvDigiRich, kTRUE);
fpFairRootMgr->RegisterAny("PsdDigi", fvDigiPsd, kTRUE);
/// CbmEvent
fpFairRootMgr->Register("CbmEvent", "Cbm Event", fEventsArray, kTRUE);
/*

Pierre-Alain Loizeau
committed
TTree* outTree =new TTree(FairRootManager::GetTreeName(), "/cbmout", 99);
LOG(info) << "define Tree " << outTree->GetName();
fpFairRootMgr->GetSink()->SetOutTree(outTree);
*/

Pierre-Alain Loizeau
committed
LOG(info) << "Initialized outTree with rootMgr at " << fpFairRootMgr;

Pierre-Alain Loizeau
committed
/// Histograms management
if (kTRUE == fbFillHistos) {
/*

Pierre-Alain Loizeau
committed
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fpAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector< std::pair< TCanvas *, std::string > > vCanvases = fpAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add( vHistos[ uHisto ].first );
std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
vHistos[ uHisto ].second );
fvpsHistosFolder.push_back( psHistoConfig );
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist( NewMessage() );
// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
BoostSerializer < std::pair< std::string, std::string > >.Serialize( *messageHist, psHistoConfig );

Pierre-Alain Loizeau
committed
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
/// Send message to the common histogram config messages queue
if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
{
throw InitTaskError( "Problem sending histo config" );
} // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
LOG(info) << "Config of hist " << psHistoConfig.first.data()
<< " in folder " << psHistoConfig.second.data() ;
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
{
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
fvpsCanvasConfig.push_back( psCanvConfig );
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan( NewMessage() );
// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
BoostSerializer < std::pair< std::string, std::string > >.Serialize( *messageCan, psCanvConfig );

Pierre-Alain Loizeau
committed
/// Send message to the common canvas config messages queue
if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
{
throw InitTaskError( "Problem sending canvas config" );
} // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
<< " is " << psCanvConfig.second.data() ;
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
*/
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);

Pierre-Alain Loizeau
committed
}
bool CbmDeviceMcbmEventSink::IsChannelNameAllowed(std::string channelName)
{
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";

Pierre-Alain Loizeau
committed
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
}
/*
Bool_t CbmDeviceMcbmEventSink::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceMcbmEventSink.";
if( kFALSE == InitParameters( fpAlgo ->GetParList() ) )
return kFALSE;
/// Need to add accessors for all options
fpAlgo ->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
Bool_t initOK = fpAlgo->InitContainers();
// Bool_t initOK = fMonitorAlgo->ReInitContainers();
return initOK;
}
Bool_t CbmDeviceMcbmEventSink::InitParameters( TList* fParCList )
{
for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
{
FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
fParCList->Remove( tempObj );
std::string paramName{ tempObj->GetName() };
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req( NewSimpleMessage(message) );
FairMQMessagePtr rep( NewMessage() );
FairParGenericSet* newObj = nullptr;
if( Send(req, "parameters") > 0 )
{
if( Receive( rep, "parameters" ) >= 0)
{
if( 0 != rep->GetSize() )
{
CbmMqTMessage tmsg( rep->GetData(), rep->GetSize() );

Pierre-Alain Loizeau
committed
newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
LOG( info ) << "Received unpack parameter from the server:";
newObj->print();
} // if( 0 != rep->GetSize() )
else
{
LOG( error ) << "Received empty reply. Parameter not available";
return kFALSE;
} // else of if( 0 != rep->GetSize() )
} // if( Receive( rep, "parameters" ) >= 0)
} // if( Send(req, "parameters") > 0 )
fParCList->AddAt( newObj, iparC );
delete tempObj;
} // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
return kTRUE;
}
*/
//--------------------------------------------------------------------//
// handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmEventSink::HandleMissTsData(FairMQMessagePtr& msg, int /*index*/)
{
std::vector<uint64_t> vIndices;
std::string msgStrMissTs(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream issMissTs(msgStrMissTs);
boost::archive::binary_iarchive inputArchiveMissTs(issMissTs);
inputArchiveMissTs >> vIndices;

Pierre-Alain Loizeau
committed
fvulMissedTsIndices.insert(fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end());

Pierre-Alain Loizeau
committed
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
}
//--------------------------------------------------------------------//
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmEventSink::HandleData(FairMQParts& parts, int /*index*/)
{
LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts"
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
/// Extract unpacked data from input message
uint32_t uPartIdx = 0;
/// TS metadata
/// TODO: code order of vectors in the TS MetaData!!
/*

Pierre-Alain Loizeau
committed
std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
( parts.At( uPartIdx ) )->GetSize() );
std::istringstream issTsMeta(msgStrTsMeta);
boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
inputArchiveTsMeta >> (*fTsMetaData);
++uPartIdx;
*/
// Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
RootSerializer().Deserialize(*parts.At(uPartIdx), fTsMetaData);
LOG(debug) << "TS metadata extracted";
/// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!!
if (fuPrevTsIndex + 1 == fTsMetaData->GetIndex()
|| (0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == fTsMetaData->GetIndex())) {
LOG(debug) << "TS direct to dump";
/// Fill all storage variables registers for data output
PrepareTreeEntry(parts);
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = fTsMetaData->GetIndex();
fulTsCounter++;
} // if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter ) )
else {
LOG(debug) << "TS direct to storage";
/// If not consecutive to last TS sent,
fmFullTsStorage.emplace_hint(fmFullTsStorage.end(), std::pair<uint64_t, CbmUnpackedTimeslice>(
fTsMetaData->GetIndex(), CbmUnpackedTimeslice(parts)));
} // else of if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == fTsMetaData->GetIndex() )
LOG(debug) << "TS metadata checked";
/// Clear metadata => crashes, maybe not needed as due to move the pointer is invalidated?
// delete fTsMetaData;
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
LOG(debug) << "TS queues checked";
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
} // if( kTRUE == fbFillHistos )

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
}
//--------------------------------------------------------------------//
bool CbmDeviceMcbmEventSink::HandleCommand(FairMQMessagePtr& msg, int /*index*/)
{

Pierre-Alain Loizeau
committed
std::string sCommand( static_cast< char * >( msg->GetData() ),
msg->GetSize() );
*/
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
std::string sCommand;
std::string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream issCmd(msgStrCmd);
boost::archive::binary_iarchive inputArchiveCmd(issCmd);
inputArchiveCmd >> sCommand;
std::string sCmdTag = sCommand;
size_t charPosDel = sCommand.find(' ');
if (std::string::npos != charPosDel) {
sCmdTag = sCommand.substr(0, charPosDel);
} // if( std::string::npos != charPosDel )
if ("EOF" == sCmdTag) {
fbReceivedEof = true;
/// Extract the last TS index and global full TS count
if (std::string::npos == charPosDel) {
LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Incomplete EOF command received: " << sCommand;
return false;
} // if( std::string::npos == charPosDel )
/// Last TS index
charPosDel++;
std::string sNext = sCommand.substr(charPosDel);
charPosDel = sNext.find(' ');
if (std::string::npos == charPosDel) {
LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Incomplete EOF command received: " << sCommand;
return false;
} // if( std::string::npos == charPosDel )
fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel));
/// Total TS count
charPosDel++;
fuTotalTsCount = std::stoul(sNext.substr(charPosDel));
LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Received EOF command with final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
/// End of data: clean save of data + close file + send last state of histos if enabled
if (fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) {
LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;

Pierre-Alain Loizeau
committed
Finish();
} // if( fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
} // if( "EOF" == sCmdTag )
else if ("STOP" == sCmdTag) {
/// TODO: different treatment in case of "BAD" ending compared to EOF?
/// Source failure: clean save of received data + close file + send last state of histos if enabled
Finish();
} // else if( "STOP" == sCmdTag )
else {
LOG(warning) << "Unknown command received: " << sCmdTag << " => will be ignored!";
} // else if command not recognized
return true;

Pierre-Alain Loizeau
committed
}
//--------------------------------------------------------------------//
void CbmDeviceMcbmEventSink::CheckTsQueues()
{

Pierre-Alain Loizeau
committed
std::map<uint64_t, CbmUnpackedTimeslice>::iterator itFullTs = fmFullTsStorage.begin();
std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin();

Pierre-Alain Loizeau
committed
while (!bHoleFoundInBothQueues) {
/// Check if the first TS in the full TS queue is the next one
if (fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first) {
/// Fill all storage variables registers for data output
PrepareTreeEntry((*itFullTs).second);
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();

Pierre-Alain Loizeau
committed
/// Update counters
fuPrevTsIndex = (*itFullTs).first;
fulTsCounter++;

Pierre-Alain Loizeau
committed
/// Increment iterator
++itFullTs;
continue;
} // if( fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first() )
/// Check if the first TS in the missed TS queue is the next one
if (fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs)) {
/// Prepare entry with only dummy TS metadata and empty storage variables
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])

Pierre-Alain Loizeau
committed
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();

Pierre-Alain Loizeau
committed
/// Update counters
fuPrevTsIndex = (*itMissTs);
fulMissedTsCounter++;
/// Increment iterator
++itMissTs;
continue;
} // if( fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs ) )
/// Should be reached only if both queues at the end or hole found in both
bHoleFoundInBothQueues = true;
} // while( !bHoleFoundInBothQueues )
/// Delete the processed entries
fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs);
fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs);
/// End of data: clean save of data + close file + send last state of histos if enabled
if (fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) {
LOG(info) << "CbmDeviceMcbmEventSink::CheckTsQueues => "
<< "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
Finish();
} // if( fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )

Pierre-Alain Loizeau
committed
}
//--------------------------------------------------------------------//
void CbmDeviceMcbmEventSink::PrepareTreeEntry(CbmUnpackedTimeslice unpTs)
{
/// FIXME: poor man solution with lots of data copy until we undertsnad how to properly deal
/// with FairMq messages ownership and memory managment

Pierre-Alain Loizeau
committed
/// FIXME: Not sure if this is the proper way to insert the data
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
TimesliceMetaData(std::move(unpTs.fTsMetaData));

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
/// Explicit copy version: safe but slow
/// Bmon
fvDigiBmon->insert( fvDigiBmon->end(), unpTs.fvDigiBmon.begin(), unpTs.fvDigiBmon.end() );

Pierre-Alain Loizeau
committed
/// STS
fvDigiSts->insert( fvDigiSts->end(), unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end() );
/// MUCH
fvDigiMuch->insert( fvDigiMuch->end(), unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end() );
/// TRD
fvDigiTrd->insert( fvDigiTrd->end(), unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end() );

Pierre-Alain Loizeau
committed
fvDigiTof->insert( fvDigiTof->end(), unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end() );
/// RICH
fvDigiRich->insert( fvDigiRich->end(), unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end() );
/// PSD
fvDigiPsd->insert( fvDigiPsd->end(), unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end() );
*/
/// Bmon
(*fvDigiBmon) = std::move(unpTs.fvDigiBmon);
/// STS
(*fvDigiSts) = std::move(unpTs.fvDigiSts);
/// MUCH
(*fvDigiMuch) = std::move(unpTs.fvDigiMuch);
/// TRD
(*fvDigiTrd) = std::move(unpTs.fvDigiTrd);
(*fvDigiTof) = std::move(unpTs.fvDigiTof);
/// RICH
(*fvDigiRich) = std::move(unpTs.fvDigiRich);
/// PSD
(*fvDigiPsd) = std::move(unpTs.fvDigiPsd);
/// Extract CbmEvent TClonesArray from input message
fEventsArray->AbsorbObjects(&(unpTs.fEventsArray));

Pierre-Alain Loizeau
committed
}
void CbmDeviceMcbmEventSink::DumpTreeEntry()
{

Pierre-Alain Loizeau
committed
* NH style
// fpFairRootMgr->FillEventHeader(fEvtHeader);
// LOG(info) << "Fill WriteOutBuffer with FairRootManager at " << fpFairRootMgr;
// fpOutRootFile->cd();
fpFairRootMgr->Fill();
fpFairRootMgr->StoreWriteoutBufferData( fpFairRootMgr->GetEventTime() );
//fpFairRootMgr->StoreAllWriteoutBufferData();
fpFairRootMgr->DeleteOldWriteoutBufferData();
*/
/// FairRunOnline style
fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime());
fpFairRootMgr->Fill();
fpFairRootMgr->DeleteOldWriteoutBufferData();
/// Clear metadata array
fTimeSliceMetaDataArray->Clear();
/// Clear vectors
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();
/// Clear event array
// fEventsArray->Delete();
fEventsArray->Clear("C");
// fEventsArray->Clear();

Pierre-Alain Loizeau
committed
}
//--------------------------------------------------------------------//
bool CbmDeviceMcbmEventSink::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);

Pierre-Alain Loizeau
committed
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )

Pierre-Alain Loizeau
committed
/// Reset the histograms after sending them (but do not reset the time)
// fpAlgo->ResetHistograms( kFALSE );

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
}
//--------------------------------------------------------------------//
CbmDeviceMcbmEventSink::~CbmDeviceMcbmEventSink()
{
/// FIXME: Add pointers check before delete
/// Close things properly if not alredy done
if (!fbFinishDone) Finish();
/// Clear metadata
fTimeSliceMetaDataArray->Clear();
delete fTimeSliceMetaDataArray;
delete fTsMetaData;
/// Clear vectors
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();
/// Clear events TClonesArray
fEventsArray->Clear();
delete fEventsArray;
delete fpRun;

Pierre-Alain Loizeau
committed
}
void CbmDeviceMcbmEventSink::Finish()
{
// Clean closure of output to root file
fpFairRootMgr->Write();
// fpFairRootMgr->GetSource()->Close();
fpFairRootMgr->CloseSink();
LOG(info) << "File closed after saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter
<< " full ones and " << fulMissedTsCounter << " missed/empty ones)";
if (kTRUE == fbFillHistos) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( kTRUE == fbFillHistos )
ChangeState(fair::mq::Transition::Stop);
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
ChangeState(fair::mq::Transition::End);
fbFinishDone = kTRUE;

Pierre-Alain Loizeau
committed
}
CbmUnpackedTimeslice::CbmUnpackedTimeslice(FairMQParts& parts) : fEventsArray("CbmEvent", 500)
{
/// Extract unpacked data from input message
uint32_t uPartIdx = 0;
/// TS metadata
/// TODO: code order of vectors in the TS MetaData!!
/*

Pierre-Alain Loizeau
committed
std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
( parts.At( uPartIdx ) )->GetSize() );
std::istringstream issTsMeta(msgStrTsMeta);
boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
inputArchiveTsMeta >> (*fTsMetaData);
++uPartIdx;
*/
TObject* tempObjectMeta = nullptr;
RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectMeta);
++uPartIdx;

Pierre-Alain Loizeau
committed
if (TString(tempObjectMeta->ClassName()).EqualTo("TimesliceMetaData")) {
fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectMeta));
} // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )

Pierre-Alain Loizeau
committed
/// Bmon
std::string msgStrBmon(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issBmon(msgStrBmon);
boost::archive::binary_iarchive inputArchiveBmon(issBmon);
inputArchiveBmon >> fvDigiBmon;

Pierre-Alain Loizeau
committed
std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issSts(msgStrSts);
boost::archive::binary_iarchive inputArchiveSts(issSts);
inputArchiveSts >> fvDigiSts;
++uPartIdx;

Pierre-Alain Loizeau
committed
std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issMuch(msgStrMuch);
boost::archive::binary_iarchive inputArchiveMuch(issMuch);
inputArchiveMuch >> fvDigiMuch;
++uPartIdx;

Pierre-Alain Loizeau
committed
std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issTrd(msgStrTrd);
boost::archive::binary_iarchive inputArchiveTrd(issTrd);
inputArchiveTrd >> fvDigiTrd;
++uPartIdx;

Pierre-Alain Loizeau
committed
std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issTof(msgStrTof);
boost::archive::binary_iarchive inputArchiveTof(issTof);
inputArchiveTof >> fvDigiTof;
++uPartIdx;

Pierre-Alain Loizeau
committed
std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issRich(msgStrRich);
boost::archive::binary_iarchive inputArchiveRich(issRich);
inputArchiveRich >> fvDigiRich;
++uPartIdx;

Pierre-Alain Loizeau
committed
std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issPsd(msgStrPsd);
boost::archive::binary_iarchive inputArchivePsd(issPsd);
inputArchivePsd >> fvDigiPsd;
++uPartIdx;
/// Extract CbmEvent TClonesArray from input message
TObject* tempObject = nullptr;
RootSerializer().Deserialize(*parts.At(uPartIdx), tempObject);
++uPartIdx;
if (TString(tempObject->ClassName()).EqualTo("TClonesArray")) {
TClonesArray* arrayEventsIn = static_cast<TClonesArray*>(tempObject);

Pierre-Alain Loizeau
committed
/// Copy data in registered TClonesArray (by taking ownership!)
fEventsArray.AbsorbObjects(arrayEventsIn);
} // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )

Pierre-Alain Loizeau
committed
}

Pierre-Alain Loizeau
committed
CbmUnpackedTimeslice::~CbmUnpackedTimeslice()
{

Pierre-Alain Loizeau
committed
fvDigiSts.clear();
fvDigiMuch.clear();
fvDigiTrd.clear();
fvDigiTof.clear();
fvDigiRich.clear();
fvDigiPsd.clear();
// fEventsArray.Clear("C");
fEventsArray.Delete();
}