-
Updates T0 digi classes with Bmon exquivalents. Standardises the use of Bmon across code base and, in-use and legacy macros.
Updates T0 digi classes with Bmon exquivalents. Standardises the use of Bmon across code base and, in-use and legacy macros.
CbmDeviceMcbmEventSink.cxx 30.31 KiB
/* Copyright (C) 2020-2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
/**
* CbmDeviceMcbmEventSink.cxx
*
* @since 2020-05-24
* @author P.-A. Loizeau
*/
#include "CbmDeviceMcbmEventSink.h"
/// CBM headers
#include "CbmEvent.h"
#include "CbmFlesCanvasTools.h"
#include "CbmMQDefs.h"
#include "TimesliceMetaData.h"
/// FAIRROOT headers
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "FairRootFileSink.h"
#include "FairRootManager.h"
#include "FairRunOnline.h"
#include "BoostSerializer.h"
#include "RootSerializer.h"
/// FAIRSOFT headers (geant, boost, ...)
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>
/// C/C++ headers
#include <thread> // this_thread::sleep_for
#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
using namespace std;
//Bool_t bMcbm2018MonitorTaskBmonResetHistos = kFALSE;
CbmDeviceMcbmEventSink::CbmDeviceMcbmEventSink() {}
void CbmDeviceMcbmEventSink::InitTask()
try {
/// Read options from executable
LOG(info) << "Init options for CbmDeviceMcbmEventSink.";
fsOutputFileName = fConfig->GetValue<std::string>("OutFileName");
fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn");
fsAllowedChannels[0] = fsChannelNameDataInput;
fbFillHistos = fConfig->GetValue<bool>("FillHistos");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
/// Associate the MissedTs Channel to the corresponding handler
OnData(fsChannelNameMissedTs, &CbmDeviceMcbmEventSink::HandleMissTsData);
/// Associate the command Channel to the corresponding handler
OnData(fsChannelNameCommands, &CbmDeviceMcbmEventSink::HandleCommand);
/// Associate the Event + Unp data Channel to the corresponding handler
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDeviceMcbmEventSink::HandleData);
} // if( entry.first.find( "ts" )
} // for( auto const &entry : fChannels )
// InitContainers();
/// Create input vectors
fvDigiBmon = new std::vector<CbmTofDigi>();
fvDigiSts = new std::vector<CbmStsDigi>();
fvDigiMuch = new std::vector<CbmMuchBeamTimeDigi>();
fvDigiTrd = new std::vector<CbmTrdDigi>();
fvDigiTof = new std::vector<CbmTofDigi>();
fvDigiRich = new std::vector<CbmRichDigi>();
fvDigiPsd = new std::vector<CbmPsdDigi>();
/// Prepare storage TClonesArrays
/// TS MetaData storage
fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
if (NULL == fTimeSliceMetaDataArray) {
throw InitTaskError("Failed creating the TS meta data TClonesarray ");
} // if( NULL == fTimeSliceMetaDataArray )
/// Events storage
/// TODO: remove TObject from CbmEvent and switch to vectors!
fEventsArray = new TClonesArray("CbmEvent", 500);
if (NULL == fEventsArray) {
throw InitTaskError("Failed creating the Events TClonesarray ");
} // if( NULL == fEventsArray )
/// Prepare root output
if ("" != fsOutputFileName) {
fpRun = new FairRunOnline();
fpFairRootMgr = FairRootManager::Instance();
fpFairRootMgr->SetSink(new FairRootFileSink(fsOutputFileName));
if (nullptr == fpFairRootMgr->GetOutFile()) {
throw InitTaskError("Could not open root file");
} // if( nullptr == fpFairRootMgr->GetOutFile() )
} // if( "" != fsOutputFileName )
else {
throw InitTaskError("Empty output filename!");
} // else of if( "" != fsOutputFileName )
LOG(info) << "Init Root Output to " << fsOutputFileName;
fpFairRootMgr->InitSink();
// fEvtHeader = new FairEventHeader();
// fEvtHeader->SetRunId(iRunId);
// rootMgr->Register("EventHeader.", "Event", fEvtHeader, kTRUE);
// rootMgr->FillEventHeader(fEvtHeader);
/// Register all input data members with the FairRoot manager
/// TS MetaData
fpFairRootMgr->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kTRUE);
/// Digis storage
fpFairRootMgr->RegisterAny("BmonDigi", fvDigiBmon, kTRUE);
fpFairRootMgr->RegisterAny("StsDigi", fvDigiSts, kTRUE);
fpFairRootMgr->RegisterAny("MuchBeamTimeDigi", fvDigiMuch, kTRUE);
fpFairRootMgr->RegisterAny("TrdDigi", fvDigiTrd, kTRUE);
fpFairRootMgr->RegisterAny("TofDigi", fvDigiTof, kTRUE);
fpFairRootMgr->RegisterAny("RichDigi", fvDigiRich, kTRUE);
fpFairRootMgr->RegisterAny("PsdDigi", fvDigiPsd, kTRUE);
/// CbmEvent
fpFairRootMgr->Register("CbmEvent", "Cbm Event", fEventsArray, kTRUE);
/*
TTree* outTree =new TTree(FairRootManager::GetTreeName(), "/cbmout", 99);
LOG(info) << "define Tree " << outTree->GetName();
fpFairRootMgr->GetSink()->SetOutTree(outTree);
*/
fpFairRootMgr->WriteFolder();
LOG(info) << "Initialized outTree with rootMgr at " << fpFairRootMgr;
/// Histograms management
if (kTRUE == fbFillHistos) {
/*
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fpAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector< std::pair< TCanvas *, std::string > > vCanvases = fpAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add( vHistos[ uHisto ].first );
std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
vHistos[ uHisto ].second );
fvpsHistosFolder.push_back( psHistoConfig );
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist( NewMessage() );
// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
BoostSerializer < std::pair< std::string, std::string > >.Serialize( *messageHist, psHistoConfig );
/// Send message to the common histogram config messages queue
if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
{
throw InitTaskError( "Problem sending histo config" );
} // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
LOG(info) << "Config of hist " << psHistoConfig.first.data()
<< " in folder " << psHistoConfig.second.data() ;
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
{
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
fvpsCanvasConfig.push_back( psCanvConfig );
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan( NewMessage() );
// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
BoostSerializer < std::pair< std::string, std::string > >.Serialize( *messageCan, psCanvConfig );
/// Send message to the common canvas config messages queue
if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
{
throw InitTaskError( "Problem sending canvas config" );
} // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
<< " is " << psCanvConfig.second.data() ;
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
*/
} // if( kTRUE == fbFillHistos )
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmDeviceMcbmEventSink::IsChannelNameAllowed(std::string channelName)
{
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
/*
Bool_t CbmDeviceMcbmEventSink::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceMcbmEventSink.";
if( kFALSE == InitParameters( fpAlgo ->GetParList() ) )
return kFALSE;
/// Need to add accessors for all options
fpAlgo ->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
Bool_t initOK = fpAlgo->InitContainers();
// Bool_t initOK = fMonitorAlgo->ReInitContainers();
return initOK;
}
Bool_t CbmDeviceMcbmEventSink::InitParameters( TList* fParCList )
{
for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
{
FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
fParCList->Remove( tempObj );
std::string paramName{ tempObj->GetName() };
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req( NewSimpleMessage(message) );
FairMQMessagePtr rep( NewMessage() );
FairParGenericSet* newObj = nullptr;
if( Send(req, "parameters") > 0 )
{
if( Receive( rep, "parameters" ) >= 0)
{
if( 0 != rep->GetSize() )
{
CbmMqTMessage tmsg( rep->GetData(), rep->GetSize() );
newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
LOG( info ) << "Received unpack parameter from the server:";
newObj->print();
} // if( 0 != rep->GetSize() )
else
{
LOG( error ) << "Received empty reply. Parameter not available";
return kFALSE;
} // else of if( 0 != rep->GetSize() )
} // if( Receive( rep, "parameters" ) >= 0)
} // if( Send(req, "parameters") > 0 )
fParCList->AddAt( newObj, iparC );
delete tempObj;
} // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
return kTRUE;
}
*/
//--------------------------------------------------------------------//
// handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmEventSink::HandleMissTsData(FairMQMessagePtr& msg, int /*index*/)
{
std::vector<uint64_t> vIndices;
std::string msgStrMissTs(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream issMissTs(msgStrMissTs);
boost::archive::binary_iarchive inputArchiveMissTs(issMissTs);
inputArchiveMissTs >> vIndices;
fvulMissedTsIndices.insert(fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end());
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
return true;
}
//--------------------------------------------------------------------//
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmEventSink::HandleData(FairMQParts& parts, int /*index*/)
{
fulNumMessages++;
LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts"
<< ", size0: " << parts.At(0)->GetSize();
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
/// Extract unpacked data from input message
uint32_t uPartIdx = 0;
/// TS metadata
/// TODO: code order of vectors in the TS MetaData!!
/*
std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
( parts.At( uPartIdx ) )->GetSize() );
std::istringstream issTsMeta(msgStrTsMeta);
boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
inputArchiveTsMeta >> (*fTsMetaData);
++uPartIdx;
*/
// Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
RootSerializer().Deserialize(*parts.At(uPartIdx), fTsMetaData);
LOG(debug) << "TS metadata extracted";
/// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!!
if (fuPrevTsIndex + 1 == fTsMetaData->GetIndex()
|| (0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == fTsMetaData->GetIndex())) {
LOG(debug) << "TS direct to dump";
/// Fill all storage variables registers for data output
PrepareTreeEntry(parts);
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = fTsMetaData->GetIndex();
fulTsCounter++;
} // if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter ) )
else {
LOG(debug) << "TS direct to storage";
/// If not consecutive to last TS sent,
fmFullTsStorage.emplace_hint(fmFullTsStorage.end(), std::pair<uint64_t, CbmUnpackedTimeslice>(
fTsMetaData->GetIndex(), CbmUnpackedTimeslice(parts)));
} // else of if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == fTsMetaData->GetIndex() )
LOG(debug) << "TS metadata checked";
/// Clear metadata => crashes, maybe not needed as due to move the pointer is invalidated?
// delete fTsMetaData;
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
LOG(debug) << "TS queues checked";
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
} // if( kTRUE == fbFillHistos )
return true;
}
//--------------------------------------------------------------------//
bool CbmDeviceMcbmEventSink::HandleCommand(FairMQMessagePtr& msg, int /*index*/)
{
/*
std::string sCommand( static_cast< char * >( msg->GetData() ),
msg->GetSize() );
*/
std::string sCommand;
std::string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream issCmd(msgStrCmd);
boost::archive::binary_iarchive inputArchiveCmd(issCmd);
inputArchiveCmd >> sCommand;
std::string sCmdTag = sCommand;
size_t charPosDel = sCommand.find(' ');
if (std::string::npos != charPosDel) {
sCmdTag = sCommand.substr(0, charPosDel);
} // if( std::string::npos != charPosDel )
if ("EOF" == sCmdTag) {
fbReceivedEof = true;
/// Extract the last TS index and global full TS count
if (std::string::npos == charPosDel) {
LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Incomplete EOF command received: " << sCommand;
return false;
} // if( std::string::npos == charPosDel )
/// Last TS index
charPosDel++;
std::string sNext = sCommand.substr(charPosDel);
charPosDel = sNext.find(' ');
if (std::string::npos == charPosDel) {
LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Incomplete EOF command received: " << sCommand;
return false;
} // if( std::string::npos == charPosDel )
fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel));
/// Total TS count
charPosDel++;
fuTotalTsCount = std::stoul(sNext.substr(charPosDel));
LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Received EOF command with final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
/// End of data: clean save of data + close file + send last state of histos if enabled
if (fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) {
LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
Finish();
} // if( fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
} // if( "EOF" == sCmdTag )
else if ("STOP" == sCmdTag) {
/// TODO: different treatment in case of "BAD" ending compared to EOF?
/// Source failure: clean save of received data + close file + send last state of histos if enabled
Finish();
} // else if( "STOP" == sCmdTag )
else {
LOG(warning) << "Unknown command received: " << sCmdTag << " => will be ignored!";
} // else if command not recognized
return true;
}
//--------------------------------------------------------------------//
void CbmDeviceMcbmEventSink::CheckTsQueues()
{
bool bHoleFoundInBothQueues = false;
std::map<uint64_t, CbmUnpackedTimeslice>::iterator itFullTs = fmFullTsStorage.begin();
std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin();
while (!bHoleFoundInBothQueues) {
/// Check if the first TS in the full TS queue is the next one
if (fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first) {
/// Fill all storage variables registers for data output
PrepareTreeEntry((*itFullTs).second);
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = (*itFullTs).first;
fulTsCounter++;
/// Increment iterator
++itFullTs;
continue;
} // if( fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first() )
/// Check if the first TS in the missed TS queue is the next one
if (fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs)) {
/// Prepare entry with only dummy TS metadata and empty storage variables
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
TimesliceMetaData(0, 0, 0, (*itMissTs));
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = (*itMissTs);
fulMissedTsCounter++;
/// Increment iterator
++itMissTs;
continue;
} // if( fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs ) )
/// Should be reached only if both queues at the end or hole found in both
bHoleFoundInBothQueues = true;
} // while( !bHoleFoundInBothQueues )
/// Delete the processed entries
fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs);
fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs);
/// End of data: clean save of data + close file + send last state of histos if enabled
if (fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) {
LOG(info) << "CbmDeviceMcbmEventSink::CheckTsQueues => "
<< "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
Finish();
} // if( fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
}
//--------------------------------------------------------------------//
void CbmDeviceMcbmEventSink::PrepareTreeEntry(CbmUnpackedTimeslice unpTs)
{
/// FIXME: poor man solution with lots of data copy until we undertsnad how to properly deal
/// with FairMq messages ownership and memory managment
/// FIXME: Not sure if this is the proper way to insert the data
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
TimesliceMetaData(std::move(unpTs.fTsMetaData));
/*
/// Explicit copy version: safe but slow
/// Bmon
fvDigiBmon->insert( fvDigiBmon->end(), unpTs.fvDigiBmon.begin(), unpTs.fvDigiBmon.end() );
/// STS
fvDigiSts->insert( fvDigiSts->end(), unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end() );
/// MUCH
fvDigiMuch->insert( fvDigiMuch->end(), unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end() );
/// TRD
fvDigiTrd->insert( fvDigiTrd->end(), unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end() );
/// BmonF
fvDigiTof->insert( fvDigiTof->end(), unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end() );
/// RICH
fvDigiRich->insert( fvDigiRich->end(), unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end() );
/// PSD
fvDigiPsd->insert( fvDigiPsd->end(), unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end() );
*/
/// move version: safe but slow
/// Bmon
(*fvDigiBmon) = std::move(unpTs.fvDigiBmon);
/// STS
(*fvDigiSts) = std::move(unpTs.fvDigiSts);
/// MUCH
(*fvDigiMuch) = std::move(unpTs.fvDigiMuch);
/// TRD
(*fvDigiTrd) = std::move(unpTs.fvDigiTrd);
/// BmonF
(*fvDigiTof) = std::move(unpTs.fvDigiTof);
/// RICH
(*fvDigiRich) = std::move(unpTs.fvDigiRich);
/// PSD
(*fvDigiPsd) = std::move(unpTs.fvDigiPsd);
/// Extract CbmEvent TClonesArray from input message
fEventsArray->AbsorbObjects(&(unpTs.fEventsArray));
}
void CbmDeviceMcbmEventSink::DumpTreeEntry()
{
// Unpacked digis + CbmEvent output to root file
/*
* NH style
// fpFairRootMgr->FillEventHeader(fEvtHeader);
// LOG(info) << "Fill WriteOutBuffer with FairRootManager at " << fpFairRootMgr;
// fpOutRootFile->cd();
fpFairRootMgr->Fill();
fpFairRootMgr->StoreWriteoutBufferData( fpFairRootMgr->GetEventTime() );
//fpFairRootMgr->StoreAllWriteoutBufferData();
fpFairRootMgr->DeleteOldWriteoutBufferData();
*/
/// FairRunOnline style
fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime());
fpFairRootMgr->Fill();
fpFairRootMgr->DeleteOldWriteoutBufferData();
/// Clear metadata array
fTimeSliceMetaDataArray->Clear();
/// Clear vectors
fvDigiBmon->clear();
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();
/// Clear event array
// fEventsArray->Delete();
fEventsArray->Clear("C");
// fEventsArray->Clear();
}
//--------------------------------------------------------------------//
bool CbmDeviceMcbmEventSink::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
// fpAlgo->ResetHistograms( kFALSE );
return true;
}
//--------------------------------------------------------------------//
CbmDeviceMcbmEventSink::~CbmDeviceMcbmEventSink()
{
/// FIXME: Add pointers check before delete
/// Close things properly if not alredy done
if (!fbFinishDone) Finish();
/// Clear metadata
fTimeSliceMetaDataArray->Clear();
delete fTimeSliceMetaDataArray;
delete fTsMetaData;
/// Clear vectors
fvDigiBmon->clear();
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();
/// Clear events TClonesArray
fEventsArray->Clear();
delete fEventsArray;
delete fpRun;
}
void CbmDeviceMcbmEventSink::Finish()
{
// Clean closure of output to root file
fpFairRootMgr->Write();
// fpFairRootMgr->GetSource()->Close();
fpFairRootMgr->CloseSink();
LOG(info) << "File closed after saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter
<< " full ones and " << fulMissedTsCounter << " missed/empty ones)";
if (kTRUE == fbFillHistos) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( kTRUE == fbFillHistos )
ChangeState(fair::mq::Transition::Stop);
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
ChangeState(fair::mq::Transition::End);
fbFinishDone = kTRUE;
}
CbmUnpackedTimeslice::CbmUnpackedTimeslice(FairMQParts& parts) : fEventsArray("CbmEvent", 500)
{
/// Extract unpacked data from input message
uint32_t uPartIdx = 0;
/// TS metadata
/// TODO: code order of vectors in the TS MetaData!!
/*
std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
( parts.At( uPartIdx ) )->GetSize() );
std::istringstream issTsMeta(msgStrTsMeta);
boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
inputArchiveTsMeta >> (*fTsMetaData);
++uPartIdx;
*/
TObject* tempObjectMeta = nullptr;
RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectMeta);
++uPartIdx;
if (TString(tempObjectMeta->ClassName()).EqualTo("TimesliceMetaData")) {
fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectMeta));
} // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )
/// Bmon
std::string msgStrBmon(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issBmon(msgStrBmon);
boost::archive::binary_iarchive inputArchiveBmon(issBmon);
inputArchiveBmon >> fvDigiBmon;
++uPartIdx;
/// STS
std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issSts(msgStrSts);
boost::archive::binary_iarchive inputArchiveSts(issSts);
inputArchiveSts >> fvDigiSts;
++uPartIdx;
/// MUCH
std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issMuch(msgStrMuch);
boost::archive::binary_iarchive inputArchiveMuch(issMuch);
inputArchiveMuch >> fvDigiMuch;
++uPartIdx;
/// TRD
std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issTrd(msgStrTrd);
boost::archive::binary_iarchive inputArchiveTrd(issTrd);
inputArchiveTrd >> fvDigiTrd;
++uPartIdx;
/// BmonF
std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issTof(msgStrTof);
boost::archive::binary_iarchive inputArchiveTof(issTof);
inputArchiveTof >> fvDigiTof;
++uPartIdx;
/// RICH
std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issRich(msgStrRich);
boost::archive::binary_iarchive inputArchiveRich(issRich);
inputArchiveRich >> fvDigiRich;
++uPartIdx;
/// PSD
std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
std::istringstream issPsd(msgStrPsd);
boost::archive::binary_iarchive inputArchivePsd(issPsd);
inputArchivePsd >> fvDigiPsd;
++uPartIdx;
/// Extract CbmEvent TClonesArray from input message
TObject* tempObject = nullptr;
RootSerializer().Deserialize(*parts.At(uPartIdx), tempObject);
++uPartIdx;
if (TString(tempObject->ClassName()).EqualTo("TClonesArray")) {
TClonesArray* arrayEventsIn = static_cast<TClonesArray*>(tempObject);
/// Copy data in registered TClonesArray (by taking ownership!)
fEventsArray.AbsorbObjects(arrayEventsIn);
} // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )
}
CbmUnpackedTimeslice::~CbmUnpackedTimeslice()
{
fvDigiBmon.clear();
fvDigiSts.clear();
fvDigiMuch.clear();
fvDigiTrd.clear();
fvDigiTof.clear();
fvDigiRich.clear();
fvDigiPsd.clear();
// fEventsArray.Clear("C");
fEventsArray.Delete();
}