From 26080ec4527f4654f4dffc448641c41c6ef56eab Mon Sep 17 00:00:00 2001 From: Volker Friese <v.friese@gsi.de> Date: Sun, 26 Jun 2022 08:43:11 +0200 Subject: [PATCH] Revised CbmDevEventSink. Remove histogramming and timeslice buffering. --- reco/mq/CbmDevEventSink.cxx | 642 ++++++++---------------------------- reco/mq/CbmDevEventSink.h | 162 +++------ reco/mq/runEventSink.cxx | 22 +- reco/mq/startUnpack.sh.in | 9 +- 4 files changed, 186 insertions(+), 649 deletions(-) diff --git a/reco/mq/CbmDevEventSink.cxx b/reco/mq/CbmDevEventSink.cxx index 00b626742f..25eaa53827 100644 --- a/reco/mq/CbmDevEventSink.cxx +++ b/reco/mq/CbmDevEventSink.cxx @@ -1,20 +1,16 @@ -/* Copyright (C) 2020-2022 Facility for Antiproton and Ion Research in Europe, Darmstadt +/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt SPDX-License-Identifier: GPL-3.0-only - Authors: Pierre-Alain Loizeau [committer], Dominik Smith */ + Authors: Dominik Smith [committer], Pierre-Alain Loizeau, Volker Friese */ #include "CbmDevEventSink.h" -/// CBM headers -#include "CbmEvent.h" -#include "CbmFlesCanvasTools.h" +// CBM headers #include "CbmMQDefs.h" #include "TimesliceMetaData.h" -/// FAIRROOT headers -#include "FairMQLogger.h" +// FAIRROOT headers #include "FairMQProgOptions.h" // device->fConfig -#include "FairParGenericSet.h" #include "FairRootFileSink.h" #include "FairRootManager.h" #include "FairRunOnline.h" @@ -23,568 +19,190 @@ #include "RootSerializer.h" -/// FAIRSOFT headers (geant, boost, ...) -#include "TCanvas.h" -#include "TFile.h" -#include "TH1.h" -#include "TList.h" -#include "TNamed.h" - +// External packages #include <boost/archive/binary_iarchive.hpp> #include <boost/serialization/utility.hpp> -/// C/C++ headers +/// C++ headers #include <thread> // this_thread::sleep_for -#include <array> -#include <iomanip> #include <stdexcept> #include <string> + + +using std::istringstream; +using std::string; +using std::vector; + + struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; }; -using namespace std; -//Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE; +// ----- Destructor ------------------------------------------------------- +CbmDevEventSink::~CbmDevEventSink() +{ + + // Close things properly if not already done + if (!fFinishDone) Finish(); + + // Clear and delete members + if (fTsMetaData) delete fTsMetaData; + if (fEventVec != nullptr) { + fEventVec->clear(); + delete fEventVec; + } + if (fFairRun) delete fFairRun; +} +// ---------------------------------------------------------------------------- -CbmDevEventSink::CbmDevEventSink() {} +// ----- Initialize ------------------------------------------------------- void CbmDevEventSink::InitTask() try { - /// Read options from executable - LOG(info) << "Init options for CbmDevEventSink."; - - fsOutputFileName = fConfig->GetValue<std::string>("OutFileName"); - - fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn"); - fsAllowedChannels[0] = fsChannelNameDataInput; - - fbFillHistos = fConfig->GetValue<bool>("FillHistos"); - fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); - fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); - fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); - fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); - - /// Associate the MissedTs Channel to the corresponding handler - OnData(fsChannelNameMissedTs, &CbmDevEventSink::HandleMissTsData); - - /// Associate the command Channel to the corresponding handler - OnData(fsChannelNameCommands, &CbmDevEventSink::HandleCommand); - - /// Associate the Event + Unp data Channel to the corresponding handler - // Get the information about created channels from the device - // Check if the defined channels from the topology (by name) - // are in the list of channels which are possible/allowed - // for the device - // The idea is to check at initilization if the devices are - // properly connected. For the time beeing this is done with a - // nameing convention. It is not avoided that someone sends other - // data on this channel. - //logger::SetLogLevel("INFO"); - int noChannel = fChannels.size(); - LOG(info) << "Number of defined channels: " << noChannel; - for (auto const& entry : fChannels) { - LOG(info) << "Channel name: " << entry.first; - if (std::string::npos != entry.first.find(fsChannelNameDataInput)) { - if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); - OnData(entry.first, &CbmDevEventSink::HandleData); - } // if( entry.first.find( "ts" ) - } // for( auto const &entry : fChannels ) - - // InitContainers(); - - /// Prepare storage TClonesArrays - /// TS MetaData storage - fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1); - if (NULL == fTimeSliceMetaDataArray) { - throw InitTaskError("Failed creating the TS meta data TClonesarray "); - } // if( NULL == fTimeSliceMetaDataArray ) - /// Events storage - /// TODO: remove TObject from CbmEvent and switch to vectors! - fEventsSel = new std::vector<CbmDigiEvent>(); - - /// Prepare root output - if ("" != fsOutputFileName) { - fpRun = new FairRunOnline(); - fpFairRootMgr = FairRootManager::Instance(); - fpFairRootMgr->SetSink(new FairRootFileSink(fsOutputFileName)); - if (nullptr == fpFairRootMgr->GetOutFile()) { - throw InitTaskError("Could not open root file"); - } // if( nullptr == fpFairRootMgr->GetOutFile() ) - } // if( "" != fsOutputFileName ) + + // Read options from executable + LOG(info) << "Init options for CbmDevEventSink"; + string outputFileName = fConfig->GetValue<std::string>("OutFileName"); + string channelNameDataInput = fConfig->GetValue<std::string>("ChannelNameDataInput"); + string channelNameCommands = fConfig->GetValue<std::string>("ChannelNameCommands"); + + // --- Hook action on input channels + OnData(channelNameDataInput, &CbmDevEventSink::HandleData); + OnData(channelNameCommands, &CbmDevEventSink::HandleCommand); + + // --- Prepare ROOT output + // TODO: WE use FairRunOnline and FairRootManager to manage the output. There might be a more + // elegant way. + fTsMetaData = new TimesliceMetaData(); + fEventVec = new vector<CbmDigiEvent>(); + if ("" != outputFileName) { + fFairRun = new FairRunOnline(); + fFairRootMgr = FairRootManager::Instance(); + fFairRootMgr->SetSink(new FairRootFileSink(outputFileName)); + if (nullptr == fFairRootMgr->GetOutFile()) throw InitTaskError("Could not open ROOT file"); + } else { throw InitTaskError("Empty output filename!"); - } // else of if( "" != fsOutputFileName ) - - LOG(info) << "Init Root Output to " << fsOutputFileName; - - fpFairRootMgr->InitSink(); - /// Register all input data members with the FairRoot manager - /// TS MetaData - fpFairRootMgr->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kTRUE); - /// CbmEvent - fpFairRootMgr->RegisterAny("DigiEvent", fEventsSel, kTRUE); - fpFairRootMgr->WriteFolder(); - - LOG(info) << "Initialized outTree with rootMgr at " << fpFairRootMgr; - - /// Histograms management - if (kTRUE == fbFillHistos) { - /// Comment to prevent clang format single lining - if (kFALSE == InitHistograms()) { throw InitTaskError("Failed to initialize the histograms."); } - } // if( kTRUE == fbFillHistos ) + } + fFairRootMgr->InitSink(); + fFairRootMgr->RegisterAny("TimesliceMetaData.", fTsMetaData, kTRUE); + fFairRootMgr->RegisterAny("DigiEvent", fEventVec, kTRUE); + fFairRootMgr->WriteFolder(); + LOG(info) << "Init ROOT Output to " << outputFileName; } catch (InitTaskError& e) { LOG(error) << e.what(); - // Wrapper defined in CbmMQDefs.h to support different FairMQ versions cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); } +// ---------------------------------------------------------------------------- -bool CbmDevEventSink::IsChannelNameAllowed(std::string channelName) -{ - for (auto const& entry : fsAllowedChannels) { - std::size_t pos1 = channelName.find(entry); - if (pos1 != std::string::npos) { - const vector<std::string>::const_iterator pos = - std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry); - const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin(); - LOG(info) << "Found " << entry << " in " << channelName; - LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; - return true; - } // if (pos1!=std::string::npos) - } // for(auto const &entry : fsAllowedChannels) - LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; - LOG(error) << "Stop device."; - return false; -} - -bool CbmDevEventSink::InitHistograms() -{ - /// Histos creation and obtain pointer on them - /// Trigger histo creation, filling vHistos and vCanvases - // bool initOK =CreateHistograms(); - bool initOK = true; - - /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) - // ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector(); - std::vector<std::pair<TNamed*, std::string>> vHistos = {}; - /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) - // ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector(); - std::vector<std::pair<TCanvas*, std::string>> vCanvases = {}; - - /// Add pointers to each histo in the histo array - /// Create histo config vector - /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > - /// and send it through a separate channel using the BoostSerializer - for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { - // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() - // << " in " << vHistos[ uHisto ].second.data() - // ; - fArrayHisto.Add(vHistos[uHisto].first); - std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); - fvpsHistosFolder.push_back(psHistoConfig); - - LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); - } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) - - /// Create canvas config vector - /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > - /// and send it through a separate channel using the BoostSerializer - for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { - // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() - // << " in " << vCanvases[ uCanv ].second.data(); - std::string sCanvName = (vCanvases[uCanv].first)->GetName(); - std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); - - std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); - - fvpsCanvasConfig.push_back(psCanvConfig); - - LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); - } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) - - return initOK; -} -//--------------------------------------------------------------------// -// handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0) -bool CbmDevEventSink::HandleMissTsData(FairMQMessagePtr& msg, int /*index*/) +// ----- Finish execution ------------------------------------------------- +void CbmDevEventSink::Finish() { - std::vector<uint64_t> vIndices; - std::string msgStrMissTs(static_cast<char*>(msg->GetData()), msg->GetSize()); - std::istringstream issMissTs(msgStrMissTs); - boost::archive::binary_iarchive inputArchiveMissTs(issMissTs); - inputArchiveMissTs >> vIndices; - - fvulMissedTsIndices.insert(fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end()); - - /// Check TS queue and process it if needed (in case it filled a hole!) - CheckTsQueues(); - - return true; + fFairRootMgr->Write(); + fFairRootMgr->CloseSink(); + LOG(info) << "File closed after " << fNumMessages << " and saving " << fNumTs << " TS"; + LOG(info) << "Index of last processed timeslice: " << fPrevTsIndex, ChangeState(fair::mq::Transition::Stop); + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + ChangeState(fair::mq::Transition::End); + fFinishDone = true; } -//--------------------------------------------------------------------// -// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) -bool CbmDevEventSink::HandleData(FairMQParts& parts, int /*index*/) -{ - fulNumMessages++; - LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts" - << ", size0: " << parts.At(0)->GetSize(); +// ---------------------------------------------------------------------------- - if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; - - /// Unpack the message - CbmEventTimeslice unpTs(parts); - - /// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!! - LOG(debug) << "Next TS check " << fuPrevTsIndex << " " << fulTsCounter << " " << unpTs.fTsMetaData.GetIndex(); - if (fuPrevTsIndex + 1 == unpTs.fTsMetaData.GetIndex() - || (0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == unpTs.fTsMetaData.GetIndex())) { - LOG(debug) << "TS direct to dump"; - /// Fill all storage variables registers for data output - PrepareTreeEntry(unpTs); - /// Trigger FairRoot manager to dump Tree entry - DumpTreeEntry(); - /// Update counters - fuPrevTsIndex = unpTs.fTsMetaData.GetIndex(); - fulTsCounter++; - } - else { - LOG(debug) << "TS direct to storage"; - /// If not consecutive to last TS sent, - fmFullTsStorage.emplace_hint(fmFullTsStorage.end(), - std::pair<uint64_t, CbmEventTimeslice>(unpTs.fTsMetaData.GetIndex(), unpTs)); - } - LOG(debug) << "TS metadata checked"; - - /// Clear metadata => crashes, maybe not needed as due to move the pointer is invalidated? - // delete fTsMetaData; - - /// Check TS queue and process it if needed (in case it filled a hole!) - CheckTsQueues(); - LOG(debug) << "TS queues checked"; - - /// Histograms management - if (kTRUE == fbFillHistos) { - /// Send histograms each 100 time slices. Should be each ~1s - /// Use also runtime checker to trigger sending after M s if - /// processing too slow or delay sending if processing too fast - std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); - std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; - if ((fdMaxPublishTime < elapsedSeconds.count()) - || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { - if (!fbConfigSent) { - // Send the configuration only once per run! - fbConfigSent = SendHistoConfAndData(); - } // if( !fbConfigSent ) - else - SendHistograms(); - - fLastPublishTime = std::chrono::system_clock::now(); - } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) - } // if( kTRUE == fbFillHistos ) - - LOG(debug) << "Processed TS with saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter - << " full ones and " << fulMissedTsCounter << " missed/empty ones)"; - LOG(debug) << "Buffers are " << fmFullTsStorage.size() << " full TS and " << fvulMissedTsIndices.size() - << " missed/empty ones)"; - LOG(debug) << "Buffers are " << fmFullTsStorage.size() << " full TS and " << fvulMissedTsIndices.size() - << " missed/empty ones)"; - return true; -} -//--------------------------------------------------------------------// -bool CbmDevEventSink::HandleCommand(FairMQMessagePtr& msg, int /*index*/) +// ----- Handle command message ------------------------------------------- +bool CbmDevEventSink::HandleCommand(FairMQMessagePtr& msg, int) { - /* - std::string sCommand( static_cast< char * >( msg->GetData() ), - msg->GetSize() ); -*/ - std::string sCommand; - std::string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize()); - std::istringstream issCmd(msgStrCmd); + // Deserialize command string + string command; + string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize()); + istringstream issCmd(msgStrCmd); boost::archive::binary_iarchive inputArchiveCmd(issCmd); - inputArchiveCmd >> sCommand; + inputArchiveCmd >> command; - std::string sCmdTag = sCommand; - size_t charPosDel = sCommand.find(' '); - if (std::string::npos != charPosDel) { - sCmdTag = sCommand.substr(0, charPosDel); - } // if( std::string::npos != charPosDel ) + // Command tag is up to the first blank + size_t charPosDel = command.find(' '); + string type = command.substr(0, charPosDel); - if ("EOF" == sCmdTag) { - fbReceivedEof = true; + // EOF command + if (type == "EOF") { - /// Extract the last TS index and global full TS count - if (std::string::npos == charPosDel) { - LOG(fatal) << "CbmDevEventSink::HandleCommand => " - << "Incomplete EOF command received: " << sCommand; + // The second substring should be the last timeslice index + if (charPosDel == string::npos) { + LOG(error) << "HandleCommand: Incomplete EOF command " << command; return false; - } // if( std::string::npos == charPosDel ) - /// Last TS index + } charPosDel++; - std::string sNext = sCommand.substr(charPosDel); - charPosDel = sNext.find(' '); - - if (std::string::npos == charPosDel) { - LOG(fatal) << "CbmDevEventSink::HandleCommand => " - << "Incomplete EOF command received: " << sCommand; + string rest = command.substr(charPosDel); + charPosDel = rest.find(' '); + if (charPosDel == string::npos) { + LOG(error) << "HandleCommand: Incomplete EOF command " << command; return false; - } // if( std::string::npos == charPosDel ) - fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel)); - /// Total TS count - charPosDel++; - fuTotalTsCount = std::stoul(sNext.substr(charPosDel)); - - LOG(info) << "CbmDevEventSink::HandleCommand => " - << "Received EOF command with final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount; - /// End of data: clean save of data + close file + send last state of histos if enabled - if (fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) { - LOG(info) << "CbmDevEventSink::HandleCommand => " - << "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount; - Finish(); - } // if( fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount ) - } // if( "EOF" == sCmdTag ) - else if ("STOP" == sCmdTag) { - /// TODO: different treatment in case of "BAD" ending compared to EOF? - /// Source failure: clean save of received data + close file + send last state of histos if enabled - Finish(); - } // else if( "STOP" == sCmdTag ) - else { - LOG(warning) << "Unknown command received: " << sCmdTag << " => will be ignored!"; - } // else if command not recognized - - return true; -} -//--------------------------------------------------------------------// -void CbmDevEventSink::CheckTsQueues() -{ - bool bHoleFoundInBothQueues = false; - - std::map<uint64_t, CbmEventTimeslice>::iterator itFullTs = fmFullTsStorage.begin(); - std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin(); - - while (!bHoleFoundInBothQueues) { - /// Check if the first TS in the full TS queue is the next one - if (fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first) { - /// Fill all storage variables registers for data output - PrepareTreeEntry((*itFullTs).second); - /// Trigger FairRoot manager to dump Tree entry - DumpTreeEntry(); - - /// Update counters - fuPrevTsIndex = (*itFullTs).first; - fulTsCounter++; - - /// Increment iterator - ++itFullTs; - continue; - } // if( fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first() ) - if (fmFullTsStorage.end() != itFullTs) - LOG(debug) << "CbmDevEventSink::CheckTsQueues => Full TS " << (*itFullTs).first << " VS " << (fuPrevTsIndex + 1); - /// Check if the first TS in the missed TS queue is the next one - if (fvulMissedTsIndices.end() != itMissTs - && ((0 == fuPrevTsIndex && fuPrevTsIndex == (*itMissTs)) - || ((0 < fulTsCounter || 0 < fulMissedTsCounter) && fuPrevTsIndex + 1 == (*itMissTs)))) { - - /// Prepare entry with only dummy TS metadata and empty storage variables - new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()]) - TimesliceMetaData(0, 0, 0, (*itMissTs)); - - /// Trigger FairRoot manager to dump Tree entry - DumpTreeEntry(); - - /// Update counters - fuPrevTsIndex = (*itMissTs); - fulMissedTsCounter++; - - /// Increment iterator - ++itMissTs; - continue; - } // if( fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs ) ) - if (fvulMissedTsIndices.end() != itMissTs) - LOG(debug) << "CbmDevEventSink::CheckTsQueues => Empty TS " << (*itMissTs) << " VS " << (fuPrevTsIndex + 1); - - /// Should be reached only if both queues at the end or hole found in both - bHoleFoundInBothQueues = true; - } // while( !bHoleFoundInBothQueues ) - - LOG(debug) << "CbmDevEventSink::CheckTsQueues => buffered TS " << fmFullTsStorage.size() << " buffered empties " - << fvulMissedTsIndices.size(); - for (auto it = fmFullTsStorage.begin(); it != fmFullTsStorage.end(); ++it) { - LOG(debug) << "CbmDevEventSink::CheckTsQueues => buffered TS index " << (*it).first; - } + } + uint64_t lastTsIndex = std::stoul(rest.substr(0, charPosDel)); - /// Delete the processed entries - fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs); - fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs); + // The third substring should be the timeslice count + charPosDel++; + uint64_t numTs = std::stoul(rest.substr(charPosDel)); - /// End of data: clean save of data + close file + send last state of histos if enabled - if (fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) { - LOG(info) << "CbmDevEventSink::CheckTsQueues => " - << "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount; + // Log + LOG(info) << "HandleCommand: Received EOF command with final TS index " << lastTsIndex << " and total number of TS " + << numTs; Finish(); - } // if( fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount ) -} -//--------------------------------------------------------------------// -void CbmDevEventSink::PrepareTreeEntry(CbmEventTimeslice unpTs) -{ - /// FIXME: poor man solution with lots of data copy until we undertsnad how to properly deal - /// with FairMq messages ownership and memory managment - - /// FIXME: Not sure if this is the proper way to insert the data - new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()]) - TimesliceMetaData(std::move(unpTs.fTsMetaData)); - - /// Extract CbmEvent TClonesArray from input message - (*fEventsSel) = std::move(unpTs.GetSelectedData()); -} - -void CbmDevEventSink::DumpTreeEntry() -{ - // Unpacked digis + CbmEvent output to root file - /// FairRunOnline style - fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime()); - fpFairRootMgr->Fill(); - fpFairRootMgr->DeleteOldWriteoutBufferData(); - - /// Clear metadata array - fTimeSliceMetaDataArray->Clear(); - - /// Clear event vector - fEventsSel->clear(); -} - -//--------------------------------------------------------------------// - -bool CbmDevEventSink::SendHistoConfAndData() -{ - /// Prepare multiparts message and header - std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size()); - FairMQMessagePtr messageHeader(NewMessage()); - BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader); - - FairMQParts partsOut; - partsOut.AddPart(std::move(messageHeader)); - - for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) { - /// Serialize the vector of histo config into a single MQ message - FairMQMessagePtr messageHist(NewMessage()); - BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]); - - partsOut.AddPart(std::move(messageHist)); - } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) - - for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) { - /// Serialize the vector of canvas config into a single MQ message - FairMQMessagePtr messageCan(NewMessage()); - BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]); - - partsOut.AddPart(std::move(messageCan)); - } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) - - /// Serialize the array of histos into a single MQ message - FairMQMessagePtr msgHistos(NewMessage()); - RootSerializer().Serialize(*msgHistos, &fArrayHisto); - - partsOut.AddPart(std::move(msgHistos)); - - /// Send the multi-parts message to the common histogram messages queue - if (Send(partsOut, fsChannelNameHistosInput) < 0) { - LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data"; - return false; - } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 ) - - /// Reset the histograms after sending them (but do not reset the time) - // ResetHistograms(kFALSE); - - return true; -} + } //? EOF -bool CbmDevEventSink::SendHistograms() -{ - /// Serialize the array of histos into a single MQ message - FairMQMessagePtr message(NewMessage()); - RootSerializer().Serialize(*message, &fArrayHisto); - - /// Send message to the common histogram messages queue - if (Send(message, fsChannelNameHistosInput) < 0) { - LOG(error) << "Problem sending data"; - return false; - } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + // STOP command + else if (type == "STOP") { + LOG(info) << "HandleCommand: Received STOP command"; + Finish(); + } - /// Reset the histograms after sending them (but do not reset the time) - // ResetHistograms(kFALSE); + // Unknown command + else { + LOG(warning) << "HandleCommand: Unknown command " << type << " => will be ignored!"; + } return true; } +// ---------------------------------------------------------------------------- -//--------------------------------------------------------------------// -CbmDevEventSink::~CbmDevEventSink() -{ - /// FIXME: Add pointers check before delete - - /// Close things properly if not alredy done - if (!fbFinishDone) Finish(); - - /// Clear events vector - fEventsSel->clear(); - delete fEventsSel; - - delete fpRun; -} - -void CbmDevEventSink::Finish() -{ - // Clean closure of output to root file - fpFairRootMgr->Write(); - // fpFairRootMgr->GetSource()->Close(); - fpFairRootMgr->CloseSink(); - LOG(info) << "File closed after saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter - << " full ones and " << fulMissedTsCounter << " missed/empty ones)"; - LOG(info) << "Still buffered TS " << fmFullTsStorage.size() << " and still buffered empties " - << fvulMissedTsIndices.size(); - - if (kTRUE == fbFillHistos) { - SendHistograms(); - fLastPublishTime = std::chrono::system_clock::now(); - } // if( kTRUE == fbFillHistos ) - - ChangeState(fair::mq::Transition::Stop); - std::this_thread::sleep_for(std::chrono::milliseconds(3000)); - ChangeState(fair::mq::Transition::End); - - fbFinishDone = kTRUE; -} -CbmEventTimeslice::CbmEventTimeslice(FairMQParts& parts) +// ----- Handle data in input channel ------------------------------------- +bool CbmDevEventSink::HandleData(FairMQParts& parts, int) { - /// Extract unpacked data from input message - uint32_t uPartIdx = 0; + fNumMessages++; + LOG(debug) << "Received message number " << fNumMessages << " with " << parts.Size() << " parts" + << ", size0: " << parts.At(0)->GetSize(); + if (0 == fNumMessages % 10000) LOG(info) << "Received " << fNumMessages << " messages"; - /// TS metadata + // --- Extract TimesliceMetaData (part 0) TObject* tempObjectPointer = nullptr; TObject* tempObjectPointer = nullptr; - RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); - + RootSerializer().Deserialize(*parts.At(0), tempObjectPointer); if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) { - fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectPointer)); + (*fTsMetaData) = *(static_cast<TimesliceMetaData*>(tempObjectPointer)); } else { LOG(fatal) << "Failed to deserialize the TS metadata"; } - ++uPartIdx; - /// Events - std::string msgStrEvt(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + // --- Extract event vector (part 1) + std::string msgStrEvt(static_cast<char*>(parts.At(1)->GetData()), (parts.At(1))->GetSize()); std::istringstream issEvt(msgStrEvt); boost::archive::binary_iarchive inputArchiveEvt(issEvt); - inputArchiveEvt >> fvEvents; - ++uPartIdx; + inputArchiveEvt >> (*fEventVec); - LOG(debug) << "Input event array " << fvEvents.size(); -} + // --- Dump tree entry for this timeslice + fFairRootMgr->StoreWriteoutBufferData(fFairRootMgr->GetEventTime()); + fFairRootMgr->Fill(); + fFairRootMgr->DeleteOldWriteoutBufferData(); + fEventVec->clear(); -CbmEventTimeslice::~CbmEventTimeslice() { fvEvents.clear(); } + // --- Timeslice log + LOG(info) << "Processed TS " << fTsMetaData->GetIndex() << " with " << fEventVec->size() << " events"; -std::vector<CbmDigiEvent> CbmEventTimeslice::GetSelectedData() { return fvEvents; } + return true; +} +// ---------------------------------------------------------------------------- diff --git a/reco/mq/CbmDevEventSink.h b/reco/mq/CbmDevEventSink.h index b05592bb37..e9c7d0ba74 100644 --- a/reco/mq/CbmDevEventSink.h +++ b/reco/mq/CbmDevEventSink.h @@ -1,137 +1,73 @@ -/* Copyright (C) 2020-2022 Facility for Antiproton and Ion Research in Europe, Darmstadt +/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt SPDX-License-Identifier: GPL-3.0-only - Authors: Pierre-Alain Loizeau [committer], Dominik Smith */ + Authors: Dominik Smith [committer], Pierre-Alain Loizeau, Volker Friese */ #ifndef CBMDEVICEEVTSINK_H_ #define CBMDEVICEEVTSINK_H_ -/// CBM headers #include "CbmDigiEvent.h" -#include "CbmEvent.h" -#include "CbmMqTMessage.h" -#include "CbmMuchDigi.h" -#include "CbmPsdDigi.h" -#include "CbmRichDigi.h" -#include "CbmStsDigi.h" -#include "CbmTofDigi.h" -#include "CbmTrdDigi.h" - -#include "TimesliceMetaData.h" - -/// FAIRROOT headers -#include "FairMQDevice.h" -/// FAIRSOFT headers (geant, boost, ...) -#include "Rtypes.h" -#include "TClonesArray.h" -#include "TObjArray.h" +#include "FairMQDevice.h" -/// C/C++ headers -#include <chrono> -#include <map> #include <vector> -class TFile; -class TList; -class TClonesArray; +class TimesliceMetaData; class FairRunOnline; class FairRootManager; -class CbmEventTimeslice { - /// TODO: rename to CbmTsWithEvents -public: - CbmEventTimeslice(FairMQParts& parts); - ~CbmEventTimeslice(); - - std::vector<CbmDigiEvent> GetSelectedData(); - - TimesliceMetaData fTsMetaData; - std::vector<CbmDigiEvent> fvEvents; -}; +/** @class CbmDefEventSink + ** @brief MQ device class to write CbmDigiEvents to a ROOT file + ** @author Dominik Smith <d.smith@gsi.de> + ** + ** Based on previous, similar implementations by P.-A. Loizeau + ** + ** The event sink device receives data (vector of CbmDigiEvents for a given timeslice) in the + ** respective input channel and fills a ROOT tree/file with these data. + **/ class CbmDevEventSink : public FairMQDevice { public: - CbmDevEventSink(); + /** @brief Constructor **/ + CbmDevEventSink() {}; + + /** @brief Destructor **/ virtual ~CbmDevEventSink(); + protected: + /** @brief Action on command messages + ** @param parts Message + ** @param flag Not used; ignored + ** @return Success + **/ + bool HandleCommand(FairMQMessagePtr&, int flag); + + /** @brief Action on data messages + ** @param parts Message + ** @param flag Not used; ignored + ** @return Success + */ + bool HandleData(FairMQParts& parts, int flag); + + /** @brief Initialization **/ virtual void InitTask(); - bool HandleMissTsData(FairMQMessagePtr&, int); - bool HandleData(FairMQParts&, int); - bool HandleCommand(FairMQMessagePtr&, int); - -private: - /// Constants - - /// Control flags - Bool_t fbFillHistos = false; //! Switch ON/OFF filling of histograms - Bool_t fbFinishDone = false; //! Keep track of whether the Finish was already called - - /// User settings parameters - /// Algo enum settings - std::string fsOutputFileName = "mcbm_digis_events.root"; - /// message queues - std::string fsChannelNameMissedTs = "missedts"; - std::string fsChannelNameDataInput = "events"; - std::string fsChannelNameCommands = "commands"; - std::string fsChannelNameHistosInput = "histogram-in"; - /// Histograms management - uint32_t fuPublishFreqTs = 100; - double_t fdMinPublishTime = 0.5; - double_t fdMaxPublishTime = 5.0; - - /// List of MQ channels names - std::vector<std::string> fsAllowedChannels = {fsChannelNameDataInput}; - - /// Statistics & missed TS detection - uint64_t fuPrevTsIndex = 0; - uint64_t fulNumMessages = 0; - uint64_t fulTsCounter = 0; - uint64_t fulMissedTsCounter = 0; - std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); - - /// Control Commands reception - bool fbReceivedEof = false; - uint64_t fuLastTsIndex = 0; - uint64_t fuTotalTsCount = 0; - - /// Data reception - /// TS MetaData storage - TClonesArray* fTimeSliceMetaDataArray = nullptr; //! - TimesliceMetaData* fTsMetaData = nullptr; - /// CbmEvents - std::vector<CbmDigiEvent>* fEventsSel = nullptr; //! output container of CbmEvents - - /// Storage for re-ordering - /// Missed TS vector - std::vector<uint64_t> fvulMissedTsIndices = {}; - /// Buffered TS - std::map<uint64_t, CbmEventTimeslice> fmFullTsStorage = {}; - - /// Data storage - FairRunOnline* fpRun = nullptr; - FairRootManager* fpFairRootMgr = nullptr; - - /// Array of histograms to send to the histogram server - TObjArray fArrayHisto = {}; - /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server - std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {}; - /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server - /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" - /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" - std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; - /// Flag indicating whether the histograms and canvases configurations were already published - bool fbConfigSent = false; - - /// Internal methods - bool IsChannelNameAllowed(std::string channelName); - bool InitHistograms(); - void CheckTsQueues(); - void PrepareTreeEntry(CbmEventTimeslice unpTs); - void DumpTreeEntry(); - bool SendHistoConfAndData(); - bool SendHistograms(); + + +private: // methods + /** @brief Finishing run **/ void Finish(); + + +private: // members + // --- Counters and status flags + size_t fNumMessages = 0; ///< Number of received data messages + size_t fNumTs = 0; ///< Number of processed timeslices + uint64_t fPrevTsIndex = 0; ///< Index of last processed timeslice + bool fFinishDone = false; ///< Keep track of whether the Finish method was already called + TimesliceMetaData* fTsMetaData = nullptr; ///< Data output: TS meta data + std::vector<CbmDigiEvent>* fEventVec = nullptr; ///< Data output: events + FairRunOnline* fFairRun = nullptr; ///< FairRunOnline to instantiate FairRootManager + FairRootManager* fFairRootMgr = nullptr; ///< FairRootManager used for ROOT file I/O }; #endif /* CBMDEVICEEVTSINK_H_ */ diff --git a/reco/mq/runEventSink.cxx b/reco/mq/runEventSink.cxx index ec49760825..31c51976c1 100644 --- a/reco/mq/runEventSink.cxx +++ b/reco/mq/runEventSink.cxx @@ -14,22 +14,12 @@ using namespace std; void addCustomOptions(bpo::options_description& options) { - options.add_options()("StoreFullTs", bpo::value<bool>()->default_value(false), - "Store digis vectors with full TS in addition to selected events if true"); - options.add_options()("OutFileName", bpo::value<std::string>()->default_value("mcbm_digis_events.root"), + options.add_options()("OutFileName", bpo::value<std::string>()->default_value(""), "Name (full or relative path) of the output .root file "); - options.add_options()("EvtNameIn", bpo::value<std::string>()->default_value("events"), - "MQ channel name for built events"); - options.add_options()("FillHistos", bpo::value<bool>()->default_value(false), - "Fill histograms and send them to histo server if true"); - - options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS"); - options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), - "Minimal time between two publishing"); - options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), - "Maximal time between two publishing"); - options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), - "MQ channel name for histos"); + options.add_options()("ChannelNameDataInput", bpo::value<std::string>()->default_value("events"), + "MQ channel name for digi events"); + options.add_options()("ChannelNameCommands", bpo::value<std::string>()->default_value("commands"), + "MQ channel name for commands"); } -FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDevEventSink(); } +FairMQDevicePtr getDevice(const FairMQProgOptions&) { return new CbmDevEventSink(); } diff --git a/reco/mq/startUnpack.sh.in b/reco/mq/startUnpack.sh.in index 1e598a20d0..34179a10cb 100755 --- a/reco/mq/startUnpack.sh.in +++ b/reco/mq/startUnpack.sh.in @@ -194,17 +194,10 @@ EVTSINK="MqDevEventSink" EVTSINK+=" --id evtsink1" EVTSINK+=" --severity info" #EVTSINK+=" --severity debug" -#EVTSINK+=" --StoreFullTs 1" EVTSINK+=" --OutFileName mcbm_digis_events.root" -EVTSINK+=" --FillHistos false" -EVTSINK+=" --PubFreqTs $_pubfreqts" -EVTSINK+=" --PubTimeMin $_pubminsec" -EVTSINK+=" --PubTimeMax $_pubmaxsec" -EVTSINK+=" --EvtNameIn events" +EVTSINK+=" --ChannelNameDataInput events" EVTSINK+=" --channel-config name=events,type=pull,method=bind,transport=zeromq,address=tcp://127.0.0.1:11557" -EVTSINK+=" --channel-config name=missedts,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11006" EVTSINK+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" -EVTSINK+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666" # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log EVTSINK_LOG="evtsink1_$LOGFILETAG" -- GitLab