-
Cleanup filtering of compiler warnings for Nightly tests Remove unused variables, parameters and data members. Remove move statement to allow copy ellision. Use unsingned int in loop when comparing with size function of containers. Initialize all data members. Fix warning from rootcling. With newer root versions the parameter -c isn't supported any longer, so remove it.
Cleanup filtering of compiler warnings for Nightly tests Remove unused variables, parameters and data members. Remove move statement to allow copy ellision. Use unsingned int in loop when comparing with size function of containers. Initialize all data members. Fix warning from rootcling. With newer root versions the parameter -c isn't supported any longer, so remove it.
CbmDeviceMcbmEventSink.cxx 30.17 KiB
/**
* CbmDeviceMcbmEventSink.cxx
*
* @since 2020-05-24
* @author P.-A. Loizeau
*/
#include "CbmDeviceMcbmEventSink.h"
/// CBM headers
#include "CbmMQDefs.h"
#include "CbmEvent.h"
#include "CbmFlesCanvasTools.h"
#include "TimesliceMetaData.h"
/// FAIRROOT headers
#include "BoostSerializer.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "FairRootFileSink.h"
#include "FairRootManager.h"
#include "FairRunOnline.h"
#include "RootSerializer.h"
/// FAIRSOFT headers (geant, boost, ...)
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>
/// C/C++ headers
#include <array>
#include <iomanip>
#include <string>
#include <thread> // this_thread::sleep_for
#include <stdexcept>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
using namespace std;
//Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE;
CbmDeviceMcbmEventSink::CbmDeviceMcbmEventSink() {}
void CbmDeviceMcbmEventSink::InitTask() try {
/// Read options from executable
LOG(info) << "Init options for CbmDeviceMcbmEventSink.";
fsOutputFileName = fConfig->GetValue<std::string>("OutFileName");
fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn");
fsAllowedChannels[0] = fsChannelNameDataInput;
fbFillHistos = fConfig->GetValue<bool>("FillHistos");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
/// Associate the MissedTs Channel to the corresponding handler
OnData(fsChannelNameMissedTs, &CbmDeviceMcbmEventSink::HandleMissTsData);
/// Associate the command Channel to the corresponding handler
OnData(fsChannelNameCommands, &CbmDeviceMcbmEventSink::HandleCommand);
/// Associate the Event + Unp data Channel to the corresponding handler
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first))
throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDeviceMcbmEventSink::HandleData);
} // if( entry.first.find( "ts" )
} // for( auto const &entry : fChannels )
// InitContainers();
/// Create input vectors
fvDigiT0 = new std::vector<CbmTofDigi>();
fvDigiSts = new std::vector<CbmStsDigi>();
fvDigiMuch = new std::vector<CbmMuchBeamTimeDigi>();
fvDigiTrd = new std::vector<CbmTrdDigi>();
fvDigiTof = new std::vector<CbmTofDigi>();
fvDigiRich = new std::vector<CbmRichDigi>();
fvDigiPsd = new std::vector<CbmPsdDigi>();
/// Prepare storage TClonesArrays
/// TS MetaData storage
fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
if (NULL == fTimeSliceMetaDataArray) {
throw InitTaskError("Failed creating the TS meta data TClonesarray ");
} // if( NULL == fTimeSliceMetaDataArray )
/// Events storage
/// TODO: remove TObject from CbmEvent and switch to vectors!
fEventsArray = new TClonesArray("CbmEvent", 500);
if (NULL == fEventsArray) {
throw InitTaskError("Failed creating the Events TClonesarray ");
} // if( NULL == fEventsArray )
/// Prepare root output
if ("" != fsOutputFileName) {
fpRun = new FairRunOnline();
fpFairRootMgr = FairRootManager::Instance();
fpFairRootMgr->SetSink(new FairRootFileSink(fsOutputFileName));
if (nullptr == fpFairRootMgr->GetOutFile()) {
throw InitTaskError("Could not open root file");
} // if( nullptr == fpFairRootMgr->GetOutFile() )
} // if( "" != fsOutputFileName )
else {
throw InitTaskError("Empty output filename!");
} // else of if( "" != fsOutputFileName )
LOG(info) << "Init Root Output to " << fsOutputFileName;
fpFairRootMgr->InitSink();
// fEvtHeader = new FairEventHeader();
// fEvtHeader->SetRunId(iRunId);
// rootMgr->Register("EventHeader.", "Event", fEvtHeader, kTRUE);
// rootMgr->FillEventHeader(fEvtHeader);
/// Register all input data members with the FairRoot manager
/// TS MetaData
fpFairRootMgr->Register(
"TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kTRUE);
/// Digis storage
fpFairRootMgr->RegisterAny("T0Digi", fvDigiT0, kTRUE);
fpFairRootMgr->RegisterAny("StsDigi", fvDigiSts, kTRUE);
fpFairRootMgr->RegisterAny("MuchBeamTimeDigi", fvDigiMuch, kTRUE);
fpFairRootMgr->RegisterAny("TrdDigi", fvDigiTrd, kTRUE);
fpFairRootMgr->RegisterAny("TofDigi", fvDigiTof, kTRUE);
fpFairRootMgr->RegisterAny("RichDigi", fvDigiRich, kTRUE);
fpFairRootMgr->RegisterAny("PsdDigi", fvDigiPsd, kTRUE);
/// CbmEvent
fpFairRootMgr->Register("CbmEvent", "Cbm Event", fEventsArray, kTRUE);
/*
TTree* outTree =new TTree(FairRootManager::GetTreeName(), "/cbmout", 99);
LOG(info) << "define Tree " << outTree->GetName();
fpFairRootMgr->GetSink()->SetOutTree(outTree);
*/
fpFairRootMgr->WriteFolder();
LOG(info) << "Initialized outTree with rootMgr at " << fpFairRootMgr;
/// Histograms management
if (kTRUE == fbFillHistos) {
/*
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fpAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector< std::pair< TCanvas *, std::string > > vCanvases = fpAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add( vHistos[ uHisto ].first );
std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
vHistos[ uHisto ].second );
fvpsHistosFolder.push_back( psHistoConfig );
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist( NewMessage() );
Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
/// Send message to the common histogram config messages queue
if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
{
throw InitTaskError( "Problem sending histo config" );
} // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
LOG(info) << "Config of hist " << psHistoConfig.first.data()
<< " in folder " << psHistoConfig.second.data() ;
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
{
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
fvpsCanvasConfig.push_back( psCanvConfig );
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan( NewMessage() );
Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
/// Send message to the common canvas config messages queue
if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
{
throw InitTaskError( "Problem sending canvas config" );
} // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
<< " is " << psCanvConfig.second.data() ;
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
*/
} // if( kTRUE == fbFillHistos )
} catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmDeviceMcbmEventSink::IsChannelNameAllowed(std::string channelName) {
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx =
pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position "
<< idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
/*
Bool_t CbmDeviceMcbmEventSink::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceMcbmEventSink.";
if( kFALSE == InitParameters( fpAlgo ->GetParList() ) )
return kFALSE;
/// Need to add accessors for all options
fpAlgo ->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
Bool_t initOK = fpAlgo->InitContainers();
// Bool_t initOK = fMonitorAlgo->ReInitContainers();
return initOK;
}
Bool_t CbmDeviceMcbmEventSink::InitParameters( TList* fParCList )
{
for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
{
FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
fParCList->Remove( tempObj );
std::string paramName{ tempObj->GetName() };
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req( NewSimpleMessage(message) );
FairMQMessagePtr rep( NewMessage() );
FairParGenericSet* newObj = nullptr;
if( Send(req, "parameters") > 0 )
{
if( Receive( rep, "parameters" ) >= 0)
{
if( 0 != rep->GetSize() )
{
CbmMQTMessage tmsg( rep->GetData(), rep->GetSize() );
newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
LOG( info ) << "Received unpack parameter from the server:";
newObj->print();
} // if( 0 != rep->GetSize() )
else
{
LOG( error ) << "Received empty reply. Parameter not available";
return kFALSE;
} // else of if( 0 != rep->GetSize() )
} // if( Receive( rep, "parameters" ) >= 0)
} // if( Send(req, "parameters") > 0 )
fParCList->AddAt( newObj, iparC );
delete tempObj;
} // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
return kTRUE;
}
*/
//--------------------------------------------------------------------//
// handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmEventSink::HandleMissTsData(FairMQMessagePtr& msg,
int /*index*/) {
std::vector<uint64_t> vIndices;
std::string msgStrMissTs(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream issMissTs(msgStrMissTs);
boost::archive::binary_iarchive inputArchiveMissTs(issMissTs);
inputArchiveMissTs >> vIndices;
fvulMissedTsIndices.insert(
fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end());
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
return true;
}
//--------------------------------------------------------------------//
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmEventSink::HandleData(FairMQParts& parts, int /*index*/) {
fulNumMessages++;
LOG(debug) << "Received message number " << fulNumMessages << " with "
<< parts.Size() << " parts"
<< ", size0: " << parts.At(0)->GetSize();
if (0 == fulNumMessages % 10000)
LOG(info) << "Received " << fulNumMessages << " messages";
/// Extract unpacked data from input message
uint32_t uPartIdx = 0;
/// TS metadata
/// TODO: code order of vectors in the TS MetaData!!
/*
std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
( parts.At( uPartIdx ) )->GetSize() );
std::istringstream issTsMeta(msgStrTsMeta);
boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
inputArchiveTsMeta >> (*fTsMetaData);
++uPartIdx;
*/
Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
LOG(debug) << "TS metadata extracted";
/// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!!
if (fuPrevTsIndex + 1 == fTsMetaData->GetIndex()
|| (0 == fuPrevTsIndex && 0 == fulTsCounter
&& 0 == fTsMetaData->GetIndex())) {
LOG(debug) << "TS direct to dump";
/// Fill all storage variables registers for data output
PrepareTreeEntry(parts);
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = fTsMetaData->GetIndex();
fulTsCounter++;
} // if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter ) )
else {
LOG(debug) << "TS direct to storage";
/// If not consecutive to last TS sent,
fmFullTsStorage.emplace_hint(
fmFullTsStorage.end(),
std::pair<uint64_t, CbmUnpackedTimeslice>(
fTsMetaData->GetIndex(), CbmUnpackedTimeslice(parts)));
} // else of if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == fTsMetaData->GetIndex() )
LOG(debug) << "TS metadata checked";
/// Clear metadata => crashes, maybe not needed as due to move the pointer is invalidated?
// delete fTsMetaData;
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
LOG(debug) << "TS queues checked";
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime =
std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds =
currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fulNumMessages % fuPublishFreqTs
&& fdMinPublishTime < elapsedSeconds.count())) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
} // if( kTRUE == fbFillHistos )
return true;
}
//--------------------------------------------------------------------//
bool CbmDeviceMcbmEventSink::HandleCommand(FairMQMessagePtr& msg,
int /*index*/) {
/*
std::string sCommand( static_cast< char * >( msg->GetData() ),
msg->GetSize() );
*/
std::string sCommand;
std::string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream issCmd(msgStrCmd);
boost::archive::binary_iarchive inputArchiveCmd(issCmd);
inputArchiveCmd >> sCommand;
std::string sCmdTag = sCommand;
size_t charPosDel = sCommand.find(' ');
if (std::string::npos != charPosDel) {
sCmdTag = sCommand.substr(0, charPosDel);
} // if( std::string::npos != charPosDel )
if ("EOF" == sCmdTag) {
fbReceivedEof = true;
/// Extract the last TS index and global full TS count
if (std::string::npos == charPosDel) {
LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Incomplete EOF command received: " << sCommand;
return false;
} // if( std::string::npos == charPosDel )
/// Last TS index
charPosDel++;
std::string sNext = sCommand.substr(charPosDel);
charPosDel = sNext.find(' ');
if (std::string::npos == charPosDel) {
LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Incomplete EOF command received: " << sCommand;
return false;
} // if( std::string::npos == charPosDel )
fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel));
/// Total TS count
charPosDel++;
fuTotalTsCount = std::stoul(sNext.substr(charPosDel));
LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Received EOF command with final TS index " << fuLastTsIndex
<< " and total nb TS " << fuTotalTsCount;
/// End of data: clean save of data + close file + send last state of histos if enabled
if (fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) {
LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
<< "Found final TS index " << fuLastTsIndex
<< " and total nb TS " << fuTotalTsCount;
Finish();
} // if( fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
} // if( "EOF" == sCmdTag )
else if ("STOP" == sCmdTag) {
/// TODO: different treatment in case of "BAD" ending compared to EOF?
/// Source failure: clean save of received data + close file + send last state of histos if enabled
Finish();
} // else if( "STOP" == sCmdTag )
else {
LOG(warning) << "Unknown command received: " << sCmdTag
<< " => will be ignored!";
} // else if command not recognized
return true;
}
//--------------------------------------------------------------------//
void CbmDeviceMcbmEventSink::CheckTsQueues() {
bool bHoleFoundInBothQueues = false;
std::map<uint64_t, CbmUnpackedTimeslice>::iterator itFullTs =
fmFullTsStorage.begin();
std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin();
while (!bHoleFoundInBothQueues) {
/// Check if the first TS in the full TS queue is the next one
if (fmFullTsStorage.end() != itFullTs
&& fuPrevTsIndex + 1 == (*itFullTs).first) {
/// Fill all storage variables registers for data output
PrepareTreeEntry((*itFullTs).second);
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = (*itFullTs).first;
fulTsCounter++;
/// Increment iterator
++itFullTs;
continue;
} // if( fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first() )
/// Check if the first TS in the missed TS queue is the next one
if (fvulMissedTsIndices.end() != itMissTs
&& fuPrevTsIndex + 1 == (*itMissTs)) {
/// Prepare entry with only dummy TS metadata and empty storage variables
new (
(*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
TimesliceMetaData(0, 0, 0, (*itMissTs));
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = (*itMissTs);
fulMissedTsCounter++;
/// Increment iterator
++itMissTs;
continue;
} // if( fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs ) )
/// Should be reached only if both queues at the end or hole found in both
bHoleFoundInBothQueues = true;
} // while( !bHoleFoundInBothQueues )
/// Delete the processed entries
fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs);
fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs);
/// End of data: clean save of data + close file + send last state of histos if enabled
if (fbReceivedEof && fuPrevTsIndex == fuLastTsIndex
&& fulTsCounter == fuTotalTsCount) {
LOG(info) << "CbmDeviceMcbmEventSink::CheckTsQueues => "
<< "Found final TS index " << fuLastTsIndex << " and total nb TS "
<< fuTotalTsCount;
Finish();
} // if( fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
}
//--------------------------------------------------------------------//
void CbmDeviceMcbmEventSink::PrepareTreeEntry(CbmUnpackedTimeslice unpTs) {
/// FIXME: poor man solution with lots of data copy until we undertsnad how to properly deal
/// with FairMq messages ownership and memory managment
/// FIXME: Not sure if this is the proper way to insert the data
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
TimesliceMetaData(std::move(unpTs.fTsMetaData));
/*
/// Explicit copy version: safe but slow
/// T0
fvDigiT0->insert( fvDigiT0->end(), unpTs.fvDigiT0.begin(), unpTs.fvDigiT0.end() );
/// STS
fvDigiSts->insert( fvDigiSts->end(), unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end() );
/// MUCH
fvDigiMuch->insert( fvDigiMuch->end(), unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end() );
/// TRD
fvDigiTrd->insert( fvDigiTrd->end(), unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end() );
/// T0F
fvDigiTof->insert( fvDigiTof->end(), unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end() );
/// RICH
fvDigiRich->insert( fvDigiRich->end(), unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end() );
/// PSD
fvDigiPsd->insert( fvDigiPsd->end(), unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end() );
*/
/// move version: safe but slow
/// T0
(*fvDigiT0) = std::move(unpTs.fvDigiT0);
/// STS
(*fvDigiSts) = std::move(unpTs.fvDigiSts);
/// MUCH
(*fvDigiMuch) = std::move(unpTs.fvDigiMuch);
/// TRD
(*fvDigiTrd) = std::move(unpTs.fvDigiTrd);
/// T0F
(*fvDigiTof) = std::move(unpTs.fvDigiTof);
/// RICH
(*fvDigiRich) = std::move(unpTs.fvDigiRich);
/// PSD
(*fvDigiPsd) = std::move(unpTs.fvDigiPsd);
/// Extract CbmEvent TClonesArray from input message
fEventsArray->AbsorbObjects(&(unpTs.fEventsArray));
}
void CbmDeviceMcbmEventSink::DumpTreeEntry() {
// Unpacked digis + CbmEvent output to root file
/*
* NH style
// fpFairRootMgr->FillEventHeader(fEvtHeader);
// LOG(info) << "Fill WriteOutBuffer with FairRootManager at " << fpFairRootMgr;
// fpOutRootFile->cd();
fpFairRootMgr->Fill();
fpFairRootMgr->StoreWriteoutBufferData( fpFairRootMgr->GetEventTime() );
//fpFairRootMgr->StoreAllWriteoutBufferData();
fpFairRootMgr->DeleteOldWriteoutBufferData();
*/
/// FairRunOnline style
fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime());
fpFairRootMgr->Fill();
fpFairRootMgr->DeleteOldWriteoutBufferData();
/// Clear metadata array
fTimeSliceMetaDataArray->Clear();
/// Clear vectors
fvDigiT0->clear();
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();
/// Clear event array
// fEventsArray->Delete();
fEventsArray->Clear("C");
// fEventsArray->Clear();
}
//--------------------------------------------------------------------//
bool CbmDeviceMcbmEventSink::SendHistograms() {
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
Serialize<RootSerializer>(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
// fpAlgo->ResetHistograms( kFALSE );
return true;
}
//--------------------------------------------------------------------//
CbmDeviceMcbmEventSink::~CbmDeviceMcbmEventSink() {
/// FIXME: Add pointers check before delete
/// Close things properly if not alredy done
if (!fbFinishDone) Finish();
/// Clear metadata
fTimeSliceMetaDataArray->Clear();
delete fTimeSliceMetaDataArray;
delete fTsMetaData;
/// Clear vectors
fvDigiT0->clear();
fvDigiSts->clear();
fvDigiMuch->clear();
fvDigiTrd->clear();
fvDigiTof->clear();
fvDigiRich->clear();
fvDigiPsd->clear();
/// Clear events TClonesArray
fEventsArray->Clear();
delete fEventsArray;
delete fpRun;
}
void CbmDeviceMcbmEventSink::Finish() {
// Clean closure of output to root file
fpFairRootMgr->Write();
// fpFairRootMgr->GetSource()->Close();
fpFairRootMgr->CloseSink();
LOG(info) << "File closed after saving "
<< (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter
<< " full ones and " << fulMissedTsCounter << " missed/empty ones)";
if (kTRUE == fbFillHistos) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( kTRUE == fbFillHistos )
ChangeState(fair::mq::Transition::Stop);
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
ChangeState(fair::mq::Transition::End);
fbFinishDone = kTRUE;
}
CbmUnpackedTimeslice::CbmUnpackedTimeslice(FairMQParts& parts)
: fEventsArray("CbmEvent", 500) {
/// Extract unpacked data from input message
uint32_t uPartIdx = 0;
/// TS metadata
/// TODO: code order of vectors in the TS MetaData!!
/*
std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
( parts.At( uPartIdx ) )->GetSize() );
std::istringstream issTsMeta(msgStrTsMeta);
boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
inputArchiveTsMeta >> (*fTsMetaData);
++uPartIdx;
*/
TObject* tempObjectMeta = nullptr;
RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectMeta);
++uPartIdx;
if (TString(tempObjectMeta->ClassName()).EqualTo("TimesliceMetaData")) {
fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectMeta));
} // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )
/// T0
std::string msgStrT0(static_cast<char*>(parts.At(uPartIdx)->GetData()),
(parts.At(uPartIdx))->GetSize());
std::istringstream issT0(msgStrT0);
boost::archive::binary_iarchive inputArchiveT0(issT0);
inputArchiveT0 >> fvDigiT0;
++uPartIdx;
/// STS
std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()),
(parts.At(uPartIdx))->GetSize());
std::istringstream issSts(msgStrSts);
boost::archive::binary_iarchive inputArchiveSts(issSts);
inputArchiveSts >> fvDigiSts;
++uPartIdx;
/// MUCH
std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()),
(parts.At(uPartIdx))->GetSize());
std::istringstream issMuch(msgStrMuch);
boost::archive::binary_iarchive inputArchiveMuch(issMuch);
inputArchiveMuch >> fvDigiMuch;
++uPartIdx;
/// TRD
std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()),
(parts.At(uPartIdx))->GetSize());
std::istringstream issTrd(msgStrTrd);
boost::archive::binary_iarchive inputArchiveTrd(issTrd);
inputArchiveTrd >> fvDigiTrd;
++uPartIdx;
/// T0F
std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()),
(parts.At(uPartIdx))->GetSize());
std::istringstream issTof(msgStrTof);
boost::archive::binary_iarchive inputArchiveTof(issTof);
inputArchiveTof >> fvDigiTof;
++uPartIdx;
/// RICH
std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()),
(parts.At(uPartIdx))->GetSize());
std::istringstream issRich(msgStrRich);
boost::archive::binary_iarchive inputArchiveRich(issRich);
inputArchiveRich >> fvDigiRich;
++uPartIdx;
/// PSD
std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()),
(parts.At(uPartIdx))->GetSize());
std::istringstream issPsd(msgStrPsd);
boost::archive::binary_iarchive inputArchivePsd(issPsd);
inputArchivePsd >> fvDigiPsd;
++uPartIdx;
/// Extract CbmEvent TClonesArray from input message
TObject* tempObject = nullptr;
RootSerializer().Deserialize(*parts.At(uPartIdx), tempObject);
++uPartIdx;
if (TString(tempObject->ClassName()).EqualTo("TClonesArray")) {
TClonesArray* arrayEventsIn = static_cast<TClonesArray*>(tempObject);
/// Copy data in registered TClonesArray (by taking ownership!)
fEventsArray.AbsorbObjects(arrayEventsIn);
} // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )
}
CbmUnpackedTimeslice::~CbmUnpackedTimeslice()
{
fvDigiT0.clear();
fvDigiSts.clear();
fvDigiMuch.clear();
fvDigiTrd.clear();
fvDigiTof.clear();
fvDigiRich.clear();
fvDigiPsd.clear();
// fEventsArray.Clear("C");
fEventsArray.Delete();
}