From bf65d13dfa95884c4374ce5a8fedbf3bda26da13 Mon Sep 17 00:00:00 2001 From: "P.-A. Loizeau" <p.-a.loizeau@gsi.de> Date: Mon, 22 Nov 2021 18:20:00 +0100 Subject: [PATCH] [MQ] First working version of the Unpack + EvtBuild + EvtSink chain in MQ - Add CbmDeviceDigiEventSink class and DigiEventSink binary - In TsaMultiSampler, fix logic for emission of missing TS list when first missing is index 1 - Use vector ofCbmEvents instead of TClonesArray at output of BuildDigiEvents - Add missing features file for Unpack-Build-Sink chain - Add optional output of the full digi vectors in Event Sink => Tested with first file of run 1588, missing TS probably need better handling as too many empty fields --- MQ/mcbm/CMakeLists.txt | 31 +- MQ/mcbm/CbmDeviceBuildDigiEvents.cxx | 57 +- MQ/mcbm/CbmDeviceBuildDigiEvents.h | 7 - MQ/mcbm/CbmDeviceDigiEventSink.cxx | 764 ++++++++++++++++++++++ MQ/mcbm/CbmDeviceDigiEventSink.h | 169 +++++ MQ/mcbm/UnpBuildSink_missing_features.txt | 8 + MQ/mcbm/runDigiEventSink.cxx | 38 ++ MQ/mcbm/startBuildRawEvents2021.sh.in | 15 +- MQ/source/CbmMQTsaMultiSampler.cxx | 4 +- 9 files changed, 1057 insertions(+), 36 deletions(-) create mode 100644 MQ/mcbm/CbmDeviceDigiEventSink.cxx create mode 100644 MQ/mcbm/CbmDeviceDigiEventSink.h create mode 100644 MQ/mcbm/UnpBuildSink_missing_features.txt create mode 100644 MQ/mcbm/runDigiEventSink.cxx diff --git a/MQ/mcbm/CMakeLists.txt b/MQ/mcbm/CMakeLists.txt index ee2ca8bc54..9cfa5aecb9 100644 --- a/MQ/mcbm/CMakeLists.txt +++ b/MQ/mcbm/CMakeLists.txt @@ -21,7 +21,6 @@ set(INCLUDE_DIRECTORIES ${CBMROOT_SOURCE_DIR}/reco/detectors/trd/rawToDigiMethods ${CBMROOT_SOURCE_DIR}/reco/detectors/rich/unpack ${CBMDATA_DIR} - ${CBMDATA_DIR}/global ${CBMDATA_DIR}/raw ${CBMDATA_DIR}/sts ${CBMDATA_DIR}/much @@ -30,6 +29,8 @@ set(INCLUDE_DIRECTORIES ${CBMDATA_DIR}/psd ${CBMDATA_DIR}/trd ${CBMDATA_DIR}/mvd # Feint to avoid crash of DigiManager due to missing source pointer + ${CBMDATA_DIR}/base + ${CBMDATA_DIR}/global ${CBMBASE_DIR} ${CBMROOT_SOURCE_DIR}/sim/transport/steer # For CbmSetup.h! ${CBMROOT_SOURCE_DIR}/sim/transport/geosetup # For CbmGeoSetupDbProvider.h, needed by CbmSetup @@ -254,3 +255,31 @@ set(DEPENDENCIES RHTTP ) GENERATE_EXECUTABLE() + + +#set(INCLUDE_DIRECTORIES +# ${CBMDATA_DIR}/base +# $INCLUDE_DIRECTORIES +# ) + +set(EXE_NAME DigiEventSink) +set(SRCS CbmDeviceDigiEventSink.cxx runDigiEventSink.cxx) + +set(DEPENDENCIES + ${DEPENDENCIES} + ${FAIR_LIBS} + ${BOOST_LIBS} + fles_ipc + CbmFlibMcbm2018 + CbmFlibFlesTools + CbmBase + CbmData + Core + RIO + Tree + Net + Hist + RHTTP +) +#GENERATE_LIBRARY() +GENERATE_EXECUTABLE() diff --git a/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx b/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx index 04320bd48e..a73da1f822 100644 --- a/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx +++ b/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx @@ -17,8 +17,8 @@ #include "CbmMQDefs.h" #include "CbmMatch.h" #include "CbmMvdDigi.h" - #include "CbmTsEventHeader.h" + #include "TimesliceMetaData.h" /// FAIRROOT headers @@ -295,9 +295,6 @@ try { if (NULL == fTimeSliceMetaDataArray) { throw InitTaskError("Failed creating the TS meta data TClonesarray "); } fpAlgo->SetTimeSliceMetaDataArray(fTimeSliceMetaDataArray); - /// Create output TClonesArray - fEvents = new TClonesArray("CbmEvent", 500); - /// Now that everything is set, initialize the Algorithm if (kFALSE == fpAlgo->InitAlgo()) { throw InitTaskError("Failed to initilize the algorithm class."); } @@ -448,6 +445,19 @@ bool CbmDeviceBuildDigiEvents::HandleData(FairMQParts& parts, int /*index*/) TimesliceMetaData(std::move(*fTsMetaData)); ++uPartIdx; + LOG(debug) << "T0 Vector size: " << fvDigiT0->size(); + LOG(debug) << "STS Vector size: " << fvDigiSts->size(); + LOG(debug) << "MUCH Vector size: " << fvDigiMuch->size(); + LOG(debug) << "TRD Vector size: " << fvDigiTrd->size(); + LOG(debug) << "TOF Vector size: " << fvDigiTof->size(); + LOG(debug) << "RICH Vector size: " << fvDigiRich->size(); + LOG(debug) << "PSD Vector size: " << fvDigiPsd->size(); + + if (1 == fulNumMessages) { + /// First message received + fpAlgo->SetTsParameters(0, fTsMetaData->GetDuration(), fTsMetaData->GetOverlapDuration()); + } + /// Call Algo ProcessTs method fpAlgo->ProcessTs(); @@ -468,7 +478,6 @@ bool CbmDeviceBuildDigiEvents::HandleData(FairMQParts& parts, int /*index*/) /// Clear event vector after usage fpAlgo->ClearEventVector(); - fEvents->Clear("C"); /// Histograms management if (kTRUE == fbFillHistos) { @@ -489,32 +498,47 @@ bool CbmDeviceBuildDigiEvents::HandleData(FairMQParts& parts, int /*index*/) bool CbmDeviceBuildDigiEvents::SendEvents(FairMQParts& partsIn) { - /// Clear events TClonesArray before usage. - fEvents->Delete(); - /// Get vector reference from algo std::vector<CbmEvent*> vEvents = fpAlgo->GetEventVector(); - /// Move CbmEvent from temporary vector to TClonesArray + /// Move CbmEvent from temporary vector to std::vector of full objects + LOG(debug) << "Vector size: " << vEvents.size(); + std::vector<CbmEvent> vOutEvents; for (CbmEvent* event : vEvents) { - LOG(debug) << "Vector: " << event->ToString(); - new ((*fEvents)[fEvents->GetEntriesFast()]) CbmEvent(std::move(*event)); - LOG(debug) << "TClonesArray: " << static_cast<CbmEvent*>(fEvents->At(fEvents->GetEntriesFast() - 1))->ToString(); + LOG(debug) << "Vector ptr: " << event->ToString(); + vOutEvents.push_back(std::move(*event)); + LOG(debug) << "Vector obj: " << vOutEvents[(vOutEvents.size()) - 1].ToString(); } /// Serialize the array of events into a single MQ message + /// FIXME: Find out if possible to use only the boost serializer FairMQMessagePtr message(NewMessage()); - Serialize<RootSerializer>(*message, fEvents); + Serialize<RootSerializer>(*message, &(vOutEvents)); + /* + std::stringstream ossEvt; + boost::archive::binary_oarchive oaEvt(ossEvt); + oaEvt << vOutEvents; + std::string* strMsgEvt = new std::string(ossEvt.str()); +*/ /// Add it at the end of the input composed message + /// FIXME: Find out if possible to use only the boost serializer FairMQParts partsOut(std::move(partsIn)); partsOut.AddPart(std::move(message)); - + /* + partsOut.AddPart(NewMessage( + const_cast<char*>(strMsgEvt->c_str()), // data + strMsgEvt->length(), // size + [](void*, void* object) { delete static_cast<std::string*>(object); }, + strMsgEvt)); // object that manages the data +*/ if (Send(partsOut, fsChannelNameDataOutput) < 0) { LOG(error) << "Problem sending data to " << fsChannelNameDataOutput; return false; } + vOutEvents.clear(); + return true; } @@ -554,12 +578,7 @@ CbmDeviceBuildDigiEvents::~CbmDeviceBuildDigiEvents() fTimeSliceMetaDataArray->Clear(); delete fTsMetaData; - /// Clear events TClonesArray - fEvents->Delete(); - - delete fpRun; delete fTimeSliceMetaDataArray; - delete fEvents; delete fpAlgo; } diff --git a/MQ/mcbm/CbmDeviceBuildDigiEvents.h b/MQ/mcbm/CbmDeviceBuildDigiEvents.h index ea9749cf66..4c911765f1 100644 --- a/MQ/mcbm/CbmDeviceBuildDigiEvents.h +++ b/MQ/mcbm/CbmDeviceBuildDigiEvents.h @@ -109,13 +109,6 @@ private: TClonesArray* fTimeSliceMetaDataArray = nullptr; //! TimesliceMetaData* fTsMetaData = nullptr; - /// Data emission - TClonesArray* fEvents = nullptr; //! output container of CbmEvents - // std::vector< CbmEvent * > & fEventVector; //! vector with all created events - - /// Internal data registration (for FairRootManager -> DigiManager links) - FairRunOnline* fpRun = nullptr; - /// Array of histograms to send to the histogram server TObjArray fArrayHisto = {}; /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.cxx b/MQ/mcbm/CbmDeviceDigiEventSink.cxx new file mode 100644 index 0000000000..21b771f612 --- /dev/null +++ b/MQ/mcbm/CbmDeviceDigiEventSink.cxx @@ -0,0 +1,764 @@ +/* Copyright (C) 2020-2021 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +/** + * CbmDeviceDigiEventSink.cxx + * + * @since 2020-05-24 + * @author P.-A. Loizeau + */ + +#include "CbmDeviceDigiEventSink.h" + + +/// CBM headers +#include "CbmEvent.h" +#include "CbmFlesCanvasTools.h" +#include "CbmMQDefs.h" + +#include "TimesliceMetaData.h" + +/// FAIRROOT headers +#include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig +#include "FairParGenericSet.h" +#include "FairRootFileSink.h" +#include "FairRootManager.h" +#include "FairRunOnline.h" + +#include "BoostSerializer.h" + +#include "RootSerializer.h" + +/// FAIRSOFT headers (geant, boost, ...) +#include "TCanvas.h" +#include "TFile.h" +#include "TH1.h" +#include "TList.h" +#include "TNamed.h" + +#include <boost/archive/binary_iarchive.hpp> +#include <boost/serialization/utility.hpp> + +/// C/C++ headers +#include <thread> // this_thread::sleep_for + +#include <array> +#include <iomanip> +#include <stdexcept> +#include <string> +struct InitTaskError : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +using namespace std; + +//Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE; + +CbmDeviceDigiEventSink::CbmDeviceDigiEventSink() {} + +void CbmDeviceDigiEventSink::InitTask() +try { + /// Read options from executable + LOG(info) << "Init options for CbmDeviceDigiEventSink."; + + fbStoreFullTs = fConfig->GetValue<bool>("StoreFullTs"); + fsOutputFileName = fConfig->GetValue<std::string>("OutFileName"); + + fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn"); + fsAllowedChannels[0] = fsChannelNameDataInput; + + fbFillHistos = fConfig->GetValue<bool>("FillHistos"); + fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); + fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg"); + fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg"); + fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); + fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); + fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); + + /// Associate the MissedTs Channel to the corresponding handler + OnData(fsChannelNameMissedTs, &CbmDeviceDigiEventSink::HandleMissTsData); + + /// Associate the command Channel to the corresponding handler + OnData(fsChannelNameCommands, &CbmDeviceDigiEventSink::HandleCommand); + + /// Associate the Event + Unp data Channel to the corresponding handler + // Get the information about created channels from the device + // Check if the defined channels from the topology (by name) + // are in the list of channels which are possible/allowed + // for the device + // The idea is to check at initilization if the devices are + // properly connected. For the time beeing this is done with a + // nameing convention. It is not avoided that someone sends other + // data on this channel. + //logger::SetLogLevel("INFO"); + int noChannel = fChannels.size(); + LOG(info) << "Number of defined channels: " << noChannel; + for (auto const& entry : fChannels) { + LOG(info) << "Channel name: " << entry.first; + if (std::string::npos != entry.first.find(fsChannelNameDataInput)) { + if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); + OnData(entry.first, &CbmDeviceDigiEventSink::HandleData); + } // if( entry.first.find( "ts" ) + } // for( auto const &entry : fChannels ) + + // InitContainers(); + + /// Prepare storage TClonesArrays + /// TS MetaData storage + fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1); + if (NULL == fTimeSliceMetaDataArray) { + throw InitTaskError("Failed creating the TS meta data TClonesarray "); + } // if( NULL == fTimeSliceMetaDataArray ) + /// Events storage + /// TODO: remove TObject from CbmEvent and switch to vectors! + fEventsSel = new std::vector<CbmDigiEvent>(); + + /// Prepare root output + if ("" != fsOutputFileName) { + fpRun = new FairRunOnline(); + fpFairRootMgr = FairRootManager::Instance(); + fpFairRootMgr->SetSink(new FairRootFileSink(fsOutputFileName)); + if (nullptr == fpFairRootMgr->GetOutFile()) { + throw InitTaskError("Could not open root file"); + } // if( nullptr == fpFairRootMgr->GetOutFile() ) + } // if( "" != fsOutputFileName ) + else { + throw InitTaskError("Empty output filename!"); + } // else of if( "" != fsOutputFileName ) + + LOG(info) << "Init Root Output to " << fsOutputFileName; + + fpFairRootMgr->InitSink(); + fEvtHeader = new CbmTsEventHeader(); + fpFairRootMgr->Register("EventHeader.", "Event", fEvtHeader, kTRUE); + + /// Register all input data members with the FairRoot manager + /// TS MetaData + fpFairRootMgr->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kTRUE); + /// CbmEvent + fpFairRootMgr->RegisterAny("DigiEvent", fEventsSel, kTRUE); + + /// Full TS Digis storage (optional usage, controlled by fbStoreFullTs!) + if (fbStoreFullTs) { + fvDigiT0 = new std::vector<CbmTofDigi>(); + fvDigiSts = new std::vector<CbmStsDigi>(); + fvDigiMuch = new std::vector<CbmMuchDigi>(); + fvDigiTrd = new std::vector<CbmTrdDigi>(); + fvDigiTof = new std::vector<CbmTofDigi>(); + fvDigiRich = new std::vector<CbmRichDigi>(); + fvDigiPsd = new std::vector<CbmPsdDigi>(); + + fpFairRootMgr->RegisterAny("T0Digi", fvDigiT0, kTRUE); + fpFairRootMgr->RegisterAny("StsDigi", fvDigiSts, kTRUE); + fpFairRootMgr->RegisterAny("MuchDigi", fvDigiMuch, kTRUE); + fpFairRootMgr->RegisterAny("TrdDigi", fvDigiTrd, kTRUE); + fpFairRootMgr->RegisterAny("TofDigi", fvDigiTof, kTRUE); + fpFairRootMgr->RegisterAny("RichDigi", fvDigiRich, kTRUE); + fpFairRootMgr->RegisterAny("PsdDigi", fvDigiPsd, kTRUE); + } + + fpFairRootMgr->WriteFolder(); + + LOG(info) << "Initialized outTree with rootMgr at " << fpFairRootMgr; + + /// Histograms management + if (kTRUE == fbFillHistos) { + /* + /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) + std::vector< std::pair< TNamed *, std::string > > vHistos = fpAlgo->GetHistoVector(); + /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) + std::vector< std::pair< TCanvas *, std::string > > vCanvases = fpAlgo->GetCanvasVector(); + + /// Add pointers to each histo in the histo array + /// Create histo config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > + /// and send it through a separate channel using the BoostSerializer + for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + { +// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() +// << " in " << vHistos[ uHisto ].second.data() +// ; + fArrayHisto.Add( vHistos[ uHisto ].first ); + std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(), + vHistos[ uHisto ].second ); + fvpsHistosFolder.push_back( psHistoConfig ); + + /// Serialize the vector of histo config into a single MQ message + FairMQMessagePtr messageHist( NewMessage() ); + Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig ); + + /// Send message to the common histogram config messages queue + if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) + { + throw InitTaskError( "Problem sending histo config" ); + } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) + + LOG(info) << "Config of hist " << psHistoConfig.first.data() + << " in folder " << psHistoConfig.second.data() ; + } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) + + /// Create canvas config vector + /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > + /// and send it through a separate channel using the BoostSerializer + for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) + { +// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() +// << " in " << vCanvases[ uCanv ].second.data(); + std::string sCanvName = (vCanvases[ uCanv ].first)->GetName(); + std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first ); + + std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf ); + + fvpsCanvasConfig.push_back( psCanvConfig ); + + /// Serialize the vector of canvas config into a single MQ message + FairMQMessagePtr messageCan( NewMessage() ); + Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig ); + + /// Send message to the common canvas config messages queue + if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) + { + throw InitTaskError( "Problem sending canvas config" ); + } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) + + LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() + << " is " << psCanvConfig.second.data() ; + } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) +*/ + } // if( kTRUE == fbFillHistos ) +} +catch (InitTaskError& e) { + LOG(error) << e.what(); + // Wrapper defined in CbmMQDefs.h to support different FairMQ versions + cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); +} + +bool CbmDeviceDigiEventSink::IsChannelNameAllowed(std::string channelName) +{ + for (auto const& entry : fsAllowedChannels) { + std::size_t pos1 = channelName.find(entry); + if (pos1 != std::string::npos) { + const vector<std::string>::const_iterator pos = + std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry); + const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin(); + LOG(info) << "Found " << entry << " in " << channelName; + LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; + return true; + } // if (pos1!=std::string::npos) + } // for(auto const &entry : fsAllowedChannels) + LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; + LOG(error) << "Stop device."; + return false; +} +//--------------------------------------------------------------------// +// handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0) +bool CbmDeviceDigiEventSink::HandleMissTsData(FairMQMessagePtr& msg, int /*index*/) +{ + std::vector<uint64_t> vIndices; + std::string msgStrMissTs(static_cast<char*>(msg->GetData()), msg->GetSize()); + std::istringstream issMissTs(msgStrMissTs); + boost::archive::binary_iarchive inputArchiveMissTs(issMissTs); + inputArchiveMissTs >> vIndices; + + fvulMissedTsIndices.insert(fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end()); + + /// Check TS queue and process it if needed (in case it filled a hole!) + CheckTsQueues(); + + return true; +} +//--------------------------------------------------------------------// +// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) +bool CbmDeviceDigiEventSink::HandleData(FairMQParts& parts, int /*index*/) +{ + fulNumMessages++; + LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts" + << ", size0: " << parts.At(0)->GetSize(); + + if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; + + /// Unpack the message + CbmEventTimeslice unpTs(parts); + + /// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!! + LOG(debug) << "Next TS check " << fuPrevTsIndex << " " << fulTsCounter << " " << unpTs.fTsMetaData.GetIndex(); + if (fuPrevTsIndex + 1 == unpTs.fTsMetaData.GetIndex() + || (0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == unpTs.fTsMetaData.GetIndex())) { + LOG(debug) << "TS direct to dump"; + /// Fill all storage variables registers for data output + PrepareTreeEntry(unpTs); + /// Trigger FairRoot manager to dump Tree entry + DumpTreeEntry(); + /// Update counters + fuPrevTsIndex = unpTs.fTsMetaData.GetIndex(); + fulTsCounter++; + } + else { + LOG(debug) << "TS direct to storage"; + /// If not consecutive to last TS sent, + fmFullTsStorage.emplace_hint(fmFullTsStorage.end(), + std::pair<uint64_t, CbmEventTimeslice>(unpTs.fTsMetaData.GetIndex(), unpTs)); + } + LOG(debug) << "TS metadata checked"; + + /// Clear metadata => crashes, maybe not needed as due to move the pointer is invalidated? + // delete fTsMetaData; + + /// Check TS queue and process it if needed (in case it filled a hole!) + CheckTsQueues(); + LOG(debug) << "TS queues checked"; + + /// Histograms management + if (kTRUE == fbFillHistos) { + /// Send histograms each 100 time slices. Should be each ~1s + /// Use also runtime checker to trigger sending after M s if + /// processing too slow or delay sending if processing too fast + std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); + std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; + if ((fdMaxPublishTime < elapsedSeconds.count()) + || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { + SendHistograms(); + fLastPublishTime = std::chrono::system_clock::now(); + } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) + } // if( kTRUE == fbFillHistos ) + + LOG(debug) << "Processed TS with saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter + << " full ones and " << fulMissedTsCounter << " missed/empty ones)"; + LOG(debug) << "Buffers are " << fmFullTsStorage.size() << " full TS and " << fvulMissedTsIndices.size() + << " missed/empty ones)"; + LOG(debug) << "Buffers are " << fmFullTsStorage.size() << " full TS and " << fvulMissedTsIndices.size() + << " missed/empty ones)"; + + return true; +} +//--------------------------------------------------------------------// +bool CbmDeviceDigiEventSink::HandleCommand(FairMQMessagePtr& msg, int /*index*/) +{ + /* + std::string sCommand( static_cast< char * >( msg->GetData() ), + msg->GetSize() ); +*/ + std::string sCommand; + std::string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize()); + std::istringstream issCmd(msgStrCmd); + boost::archive::binary_iarchive inputArchiveCmd(issCmd); + inputArchiveCmd >> sCommand; + + std::string sCmdTag = sCommand; + size_t charPosDel = sCommand.find(' '); + if (std::string::npos != charPosDel) { + sCmdTag = sCommand.substr(0, charPosDel); + } // if( std::string::npos != charPosDel ) + + if ("EOF" == sCmdTag) { + fbReceivedEof = true; + + /// Extract the last TS index and global full TS count + if (std::string::npos == charPosDel) { + LOG(fatal) << "CbmDeviceDigiEventSink::HandleCommand => " + << "Incomplete EOF command received: " << sCommand; + return false; + } // if( std::string::npos == charPosDel ) + /// Last TS index + charPosDel++; + std::string sNext = sCommand.substr(charPosDel); + charPosDel = sNext.find(' '); + + if (std::string::npos == charPosDel) { + LOG(fatal) << "CbmDeviceDigiEventSink::HandleCommand => " + << "Incomplete EOF command received: " << sCommand; + return false; + } // if( std::string::npos == charPosDel ) + fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel)); + /// Total TS count + charPosDel++; + fuTotalTsCount = std::stoul(sNext.substr(charPosDel)); + + LOG(info) << "CbmDeviceDigiEventSink::HandleCommand => " + << "Received EOF command with final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount; + /// End of data: clean save of data + close file + send last state of histos if enabled + if (fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) { + LOG(info) << "CbmDeviceDigiEventSink::HandleCommand => " + << "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount; + Finish(); + } // if( fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount ) + } // if( "EOF" == sCmdTag ) + else if ("STOP" == sCmdTag) { + /// TODO: different treatment in case of "BAD" ending compared to EOF? + /// Source failure: clean save of received data + close file + send last state of histos if enabled + Finish(); + } // else if( "STOP" == sCmdTag ) + else { + LOG(warning) << "Unknown command received: " << sCmdTag << " => will be ignored!"; + } // else if command not recognized + + return true; +} +//--------------------------------------------------------------------// +void CbmDeviceDigiEventSink::CheckTsQueues() +{ + bool bHoleFoundInBothQueues = false; + + std::map<uint64_t, CbmEventTimeslice>::iterator itFullTs = fmFullTsStorage.begin(); + std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin(); + + while (!bHoleFoundInBothQueues) { + /// Check if the first TS in the full TS queue is the next one + if (fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first) { + /// Fill all storage variables registers for data output + PrepareTreeEntry((*itFullTs).second); + /// Trigger FairRoot manager to dump Tree entry + DumpTreeEntry(); + + /// Update counters + fuPrevTsIndex = (*itFullTs).first; + fulTsCounter++; + + /// Increment iterator + ++itFullTs; + continue; + } // if( fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first() ) + if (fmFullTsStorage.end() != itFullTs) + LOG(debug) << "CbmDeviceDigiEventSink::CheckTsQueues => Full TS " << (*itFullTs).first << " VS " + << (fuPrevTsIndex + 1); + /// Check if the first TS in the missed TS queue is the next one + if (fvulMissedTsIndices.end() != itMissTs + && ((0 == fuPrevTsIndex && fuPrevTsIndex == (*itMissTs)) + || ((0 < fulTsCounter || 0 < fulMissedTsCounter) && fuPrevTsIndex + 1 == (*itMissTs)))) { + + /// Prepare entry with only dummy TS metadata and empty storage variables + new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()]) + TimesliceMetaData(0, 0, 0, (*itMissTs)); + + /// Trigger FairRoot manager to dump Tree entry + DumpTreeEntry(); + + /// Update counters + fuPrevTsIndex = (*itMissTs); + fulMissedTsCounter++; + + /// Increment iterator + ++itMissTs; + continue; + } // if( fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs ) ) + if (fvulMissedTsIndices.end() != itMissTs) + LOG(debug) << "CbmDeviceDigiEventSink::CheckTsQueues => Empty TS " << (*itMissTs) << " VS " + << (fuPrevTsIndex + 1); + + /// Should be reached only if both queues at the end or hole found in both + bHoleFoundInBothQueues = true; + } // while( !bHoleFoundInBothQueues ) + + /// Delete the processed entries + fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs); + fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs); + + /// End of data: clean save of data + close file + send last state of histos if enabled + if (fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) { + LOG(info) << "CbmDeviceDigiEventSink::CheckTsQueues => " + << "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount; + Finish(); + } // if( fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount ) +} +//--------------------------------------------------------------------// +void CbmDeviceDigiEventSink::PrepareTreeEntry(CbmEventTimeslice unpTs) +{ + /// FIXME: poor man solution with lots of data copy until we undertsnad how to properly deal + /// with FairMq messages ownership and memory managment + + (*fEvtHeader) = std::move(unpTs.fCbmTsEventHeader); + + /// FIXME: Not sure if this is the proper way to insert the data + new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()]) + TimesliceMetaData(std::move(unpTs.fTsMetaData)); + + /// Extract CbmEvent TClonesArray from input message + (*fEventsSel) = std::move(unpTs.GetSelectedData()); + + /// Full TS Digis storage (optional usage, controlled by fbStoreFullTs!) + if (fbStoreFullTs) { + if( 0 < unpTs.fvDigiT0.size() ) fvDigiT0->assign( unpTs.fvDigiT0.begin(), unpTs.fvDigiT0.end() ); + if( 0 < unpTs.fvDigiSts.size() ) fvDigiSts->assign( unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end() ); + if( 0 < unpTs.fvDigiMuch.size() ) fvDigiMuch->assign( unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end() ); + if( 0 < unpTs.fvDigiTrd.size() ) fvDigiTrd->assign( unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end() ); + if( 0 < unpTs.fvDigiTof.size() ) fvDigiTof->assign( unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end() ); + if( 0 < unpTs.fvDigiRich.size() ) fvDigiRich->assign( unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end() ); + if( 0 < unpTs.fvDigiPsd.size() ) fvDigiPsd->assign( unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end() ); + } +} +void CbmDeviceDigiEventSink::DumpTreeEntry() +{ + // Unpacked digis + CbmEvent output to root file + /* + * NH style +// fpFairRootMgr->FillEventHeader(fEvtHeader); +// LOG(info) << "Fill WriteOutBuffer with FairRootManager at " << fpFairRootMgr; +// fpOutRootFile->cd(); + fpFairRootMgr->Fill(); + fpFairRootMgr->StoreWriteoutBufferData( fpFairRootMgr->GetEventTime() ); + //fpFairRootMgr->StoreAllWriteoutBufferData(); + fpFairRootMgr->DeleteOldWriteoutBufferData(); +*/ + /// FairRunOnline style + fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime()); + fpFairRootMgr->FillEventHeader(fEvtHeader); + fpFairRootMgr->Fill(); + fpFairRootMgr->DeleteOldWriteoutBufferData(); +// fpFairRootMgr->Write(); + + /// Clear metadata array + fTimeSliceMetaDataArray->Clear(); + + /// Clear event vector + fEventsSel->clear(); + /// Full TS Digis storage (optional usage, controlled by fbStoreFullTs!) + if (fbStoreFullTs) { + fvDigiT0->clear(); + fvDigiSts->clear(); + fvDigiMuch->clear(); + fvDigiTrd->clear(); + fvDigiTof->clear(); + fvDigiRich->clear(); + fvDigiPsd->clear(); + } +} + +//--------------------------------------------------------------------// +bool CbmDeviceDigiEventSink::SendHistograms() +{ + /// Serialize the array of histos into a single MQ message + FairMQMessagePtr message(NewMessage()); + Serialize<RootSerializer>(*message, &fArrayHisto); + + /// Send message to the common histogram messages queue + if (Send(message, fsChannelNameHistosInput) < 0) { + LOG(error) << "Problem sending data"; + return false; + } // if( Send( message, fsChannelNameHistosInput ) < 0 ) + + /// Reset the histograms after sending them (but do not reset the time) + // fpAlgo->ResetHistograms( kFALSE ); + + return true; +} + +//--------------------------------------------------------------------// +CbmDeviceDigiEventSink::~CbmDeviceDigiEventSink() +{ + /// FIXME: Add pointers check before delete + + /// Close things properly if not alredy done + if (!fbFinishDone) Finish(); + + /// Clear events vector + fEventsSel->clear(); + delete fEventsSel; + + delete fpRun; +} + +void CbmDeviceDigiEventSink::Finish() +{ + // Clean closure of output to root file + fpFairRootMgr->Write(); + // fpFairRootMgr->GetSource()->Close(); + fpFairRootMgr->CloseSink(); + LOG(info) << "File closed after saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter + << " full ones and " << fulMissedTsCounter << " missed/empty ones)"; + + if (kTRUE == fbFillHistos) { + SendHistograms(); + fLastPublishTime = std::chrono::system_clock::now(); + } // if( kTRUE == fbFillHistos ) + + ChangeState(fair::mq::Transition::Stop); + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + ChangeState(fair::mq::Transition::End); + + fbFinishDone = kTRUE; +} + +CbmEventTimeslice::CbmEventTimeslice(FairMQParts& parts) +{ + /// Extract unpacked data from input message + uint32_t uPartIdx = 0; + + /// TODO: code order of vectors in the TS header!! + + /// TS header + TObject* tempObjectPointer = nullptr; + RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); + if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("CbmTsEventHeader")) { + fCbmTsEventHeader = *(static_cast<CbmTsEventHeader*>(tempObjectPointer)); + } + else { + LOG(fatal) << "Failed to deserialize the TS header"; + } + ++uPartIdx; + + /// T0 + std::string msgStrT0(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issT0(msgStrT0); + boost::archive::binary_iarchive inputArchiveT0(issT0); + inputArchiveT0 >> fvDigiT0; + ++uPartIdx; + + /// STS + std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issSts(msgStrSts); + boost::archive::binary_iarchive inputArchiveSts(issSts); + inputArchiveSts >> fvDigiSts; + ++uPartIdx; + + /// MUCH + std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issMuch(msgStrMuch); + boost::archive::binary_iarchive inputArchiveMuch(issMuch); + inputArchiveMuch >> fvDigiMuch; + ++uPartIdx; + + /// TRD + std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issTrd(msgStrTrd); + boost::archive::binary_iarchive inputArchiveTrd(issTrd); + inputArchiveTrd >> fvDigiTrd; + ++uPartIdx; + + /// T0F + std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issTof(msgStrTof); + boost::archive::binary_iarchive inputArchiveTof(issTof); + inputArchiveTof >> fvDigiTof; + ++uPartIdx; + + /// RICH + std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issRich(msgStrRich); + boost::archive::binary_iarchive inputArchiveRich(issRich); + inputArchiveRich >> fvDigiRich; + ++uPartIdx; + + /// PSD + std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issPsd(msgStrPsd); + boost::archive::binary_iarchive inputArchivePsd(issPsd); + inputArchivePsd >> fvDigiPsd; + ++uPartIdx; + + /// TS metadata + tempObjectPointer = nullptr; + RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); + + if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) { + fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectPointer)); + } + else { + LOG(fatal) << "Failed to deserialize the TS metadata"; + } + ++uPartIdx; + + /// Events + /// FIXME: Find out if possible to use only the boost serializer/deserializer + /* + std::string msgStrEvt(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issEvt(msgStrEvt); + boost::archive::binary_iarchive inputArchiveEvt(issEvt); + inputArchiveEvt >> fvEvents; + ++uPartIdx; + LOG(info) << "Input event array " << fvEvents.size(); +*/ + std::vector<CbmEvent>* pvOutEvents = nullptr; + RootSerializer().Deserialize(*parts.At(uPartIdx), pvOutEvents); + fvEvents = std::move(*pvOutEvents); + LOG(debug) << "Input event array " << fvEvents.size(); +} + +CbmEventTimeslice::~CbmEventTimeslice() +{ + fvDigiT0.clear(); + fvDigiSts.clear(); + fvDigiMuch.clear(); + fvDigiTrd.clear(); + fvDigiTof.clear(); + fvDigiRich.clear(); + fvDigiPsd.clear(); + fvEvents.clear(); +} + + +std::vector<CbmDigiEvent> CbmEventTimeslice::GetSelectedData() +{ + std::vector<CbmDigiEvent> vEventsSel; + vEventsSel.reserve(fvEvents.size()); + + /// Loop on events in input vector + for (CbmEvent event : fvEvents) { + CbmDigiEvent selEvent; + selEvent.fTime = event.GetStartTime(); + selEvent.fNumber = event.GetNumber(); + + /// for each detector, find the data in the Digi vectors and copy them + /// TODO: Template + loop on list of data types? + /// ==> T0 + uint32_t uNbDigis = (0 < event.GetNofData(ECbmDataType::kT0Digi) ? event.GetNofData(ECbmDataType::kT0Digi) : 0); + if (uNbDigis) { + for (uint32_t uDigiIdx = 0; uDigiIdx < uNbDigis; ++uDigiIdx) { + selEvent.fData.fT0.fDigis.push_back(fvDigiT0[event.GetIndex(ECbmDataType::kT0Digi, uDigiIdx)]); + } + } + + /// ==> STS + uNbDigis = (0 < event.GetNofData(ECbmDataType::kStsDigi) ? event.GetNofData(ECbmDataType::kStsDigi) : 0); + if (uNbDigis) { + for (uint32_t uDigiIdx = 0; uDigiIdx < uNbDigis; ++uDigiIdx) { + selEvent.fData.fSts.fDigis.push_back(fvDigiSts[event.GetIndex(ECbmDataType::kStsDigi, uDigiIdx)]); + } + } + + /// ==> MUCH + uNbDigis = (0 < event.GetNofData(ECbmDataType::kMuchDigi) ? event.GetNofData(ECbmDataType::kMuchDigi) : 0); + if (uNbDigis) { + for (uint32_t uDigiIdx = 0; uDigiIdx < uNbDigis; ++uDigiIdx) { + selEvent.fData.fMuch.fDigis.push_back(fvDigiMuch[event.GetIndex(ECbmDataType::kMuchDigi, uDigiIdx)]); + } + } + + /// ==> TRD + uNbDigis = (0 < event.GetNofData(ECbmDataType::kTrdDigi) ? event.GetNofData(ECbmDataType::kTrdDigi) : 0); + if (uNbDigis) { + for (uint32_t uDigiIdx = 0; uDigiIdx < uNbDigis; ++uDigiIdx) { + selEvent.fData.fTrd.fDigis.push_back(fvDigiTrd[event.GetIndex(ECbmDataType::kTrdDigi, uDigiIdx)]); + } + } + + /// ==> TOF + uNbDigis = (0 < event.GetNofData(ECbmDataType::kTofDigi) ? event.GetNofData(ECbmDataType::kTofDigi) : 0); + if (uNbDigis) { + for (uint32_t uDigiIdx = 0; uDigiIdx < uNbDigis; ++uDigiIdx) { + selEvent.fData.fTof.fDigis.push_back(fvDigiTof[event.GetIndex(ECbmDataType::kTofDigi, uDigiIdx)]); + } + } + + /// ==> RICH + uNbDigis = (0 < event.GetNofData(ECbmDataType::kRichDigi) ? event.GetNofData(ECbmDataType::kRichDigi) : 0); + if (uNbDigis) { + for (uint32_t uDigiIdx = 0; uDigiIdx < uNbDigis; ++uDigiIdx) { + selEvent.fData.fRich.fDigis.push_back(fvDigiRich[event.GetIndex(ECbmDataType::kRichDigi, uDigiIdx)]); + } + } + + /// ==> PSD + uNbDigis = (0 < event.GetNofData(ECbmDataType::kPsdDigi) ? event.GetNofData(ECbmDataType::kPsdDigi) : 0); + if (uNbDigis) { + for (uint32_t uDigiIdx = 0; uDigiIdx < uNbDigis; ++uDigiIdx) { + selEvent.fData.fPsd.fDigis.push_back(fvDigiPsd[event.GetIndex(ECbmDataType::kPsdDigi, uDigiIdx)]); + } + } + + vEventsSel.push_back(selEvent); + } + + return vEventsSel; +} diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.h b/MQ/mcbm/CbmDeviceDigiEventSink.h new file mode 100644 index 0000000000..3090e1b8fb --- /dev/null +++ b/MQ/mcbm/CbmDeviceDigiEventSink.h @@ -0,0 +1,169 @@ +/* Copyright (C) 2020-2021 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +/** + * CbmDeviceDigiEventSink.h + * + * @since 2020-05-04 + * @author P.-A. Loizeau + */ + +#ifndef CBMDEVICEDIGIEVTSINK_H_ +#define CBMDEVICEDIGIEVTSINK_H_ + +/// CBM headers +#include "CbmDigiEvent.h" +#include "CbmEvent.h" +#include "CbmMqTMessage.h" +#include "CbmMuchDigi.h" +#include "CbmPsdDigi.h" +#include "CbmRichDigi.h" +#include "CbmStsDigi.h" +#include "CbmTofDigi.h" +#include "CbmTrdDigi.h" +#include "CbmTsEventHeader.h" + +#include "TimesliceMetaData.h" + +/// FAIRROOT headers +#include "FairMQDevice.h" + +/// FAIRSOFT headers (geant, boost, ...) +#include "Rtypes.h" +#include "TClonesArray.h" +#include "TObjArray.h" + +/// C/C++ headers +#include <chrono> +#include <map> +#include <vector> + +class TFile; +class TList; +class TClonesArray; +//class TimesliceMetaData; +class FairRunOnline; +class FairRootManager; + +class CbmEventTimeslice { + /// TODO: rename to CbmTsWithEvents +public: + CbmEventTimeslice(FairMQParts& parts); + ~CbmEventTimeslice(); + + std::vector<CbmDigiEvent> GetSelectedData(); + + /// TS information in header + CbmTsEventHeader fCbmTsEventHeader; + std::vector<CbmTofDigi> fvDigiT0; + std::vector<CbmStsDigi> fvDigiSts; + std::vector<CbmMuchDigi> fvDigiMuch; + std::vector<CbmTrdDigi> fvDigiTrd; + std::vector<CbmTofDigi> fvDigiTof; + std::vector<CbmRichDigi> fvDigiRich; + std::vector<CbmPsdDigi> fvDigiPsd; + TimesliceMetaData fTsMetaData; + std::vector<CbmEvent> fvEvents; +}; + +class CbmDeviceDigiEventSink : public FairMQDevice { +public: + CbmDeviceDigiEventSink(); + virtual ~CbmDeviceDigiEventSink(); + +protected: + virtual void InitTask(); + bool HandleMissTsData(FairMQMessagePtr&, int); + bool HandleData(FairMQParts&, int); + bool HandleCommand(FairMQMessagePtr&, int); + +private: + /// Constants + + /// Control flags + Bool_t fbStoreFullTs = false; //! If true, store digis vectors with full TS in addition to selected events + Bool_t fbFillHistos = false; //! Switch ON/OFF filling of histograms + Bool_t fbFinishDone = false; //! Keep track of whether the Finish was already called + + /// User settings parameters + /// Algo enum settings + std::string fsOutputFileName = "mcbm_digis_events.root"; + /// message queues + std::string fsChannelNameMissedTs = "missedts"; + std::string fsChannelNameDataInput = "events"; + std::string fsChannelNameCommands = "commands"; + std::string fsChannelNameHistosInput = "histogram-in"; + std::string fsChannelNameHistosConfig = "histo-conf"; + std::string fsChannelNameCanvasConfig = "canvas-conf"; + /// Histograms management + uint32_t fuPublishFreqTs = 100; + double_t fdMinPublishTime = 0.5; + double_t fdMaxPublishTime = 5.0; + + /// List of MQ channels names + std::vector<std::string> fsAllowedChannels = {fsChannelNameDataInput}; + + /// Parameters management + // TList* fParCList = nullptr; + // Bool_t InitParameters( TList* fParCList ); + + /// Statistics & missed TS detection + uint64_t fuPrevTsIndex = 0; + uint64_t fulNumMessages = 0; + uint64_t fulTsCounter = 0; + uint64_t fulMissedTsCounter = 0; + std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); + + /// Control Commands reception + bool fbReceivedEof = false; + uint64_t fuLastTsIndex = 0; + uint64_t fuTotalTsCount = 0; + + /// Data reception + /// Event (TS) header + CbmTsEventHeader* fEvtHeader = nullptr; + /// TS MetaData storage + TClonesArray* fTimeSliceMetaDataArray = nullptr; //! + TimesliceMetaData* fTsMetaData = nullptr; + /// CbmEvents + std::vector<CbmDigiEvent>* fEventsSel = nullptr; //! output container of CbmEvents + /// Full TS Digis storage (optional usage, controlled by fbStoreFullTs!) + std::vector<CbmTofDigi>* fvDigiT0 = nullptr; + std::vector<CbmStsDigi>* fvDigiSts = nullptr; + std::vector<CbmMuchDigi>* fvDigiMuch = nullptr; + std::vector<CbmTrdDigi>* fvDigiTrd = nullptr; + std::vector<CbmTofDigi>* fvDigiTof = nullptr; + std::vector<CbmRichDigi>* fvDigiRich = nullptr; + std::vector<CbmPsdDigi>* fvDigiPsd = nullptr; + + /// Storage for re-ordering + /// Missed TS vector + std::vector<uint64_t> fvulMissedTsIndices = {}; + /// Buffered TS + std::map<uint64_t, CbmEventTimeslice> fmFullTsStorage = {}; + + /// Data storage + FairRunOnline* fpRun = nullptr; + FairRootManager* fpFairRootMgr = nullptr; + + /// Array of histograms to send to the histogram server + TObjArray fArrayHisto = {}; + /// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server + std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {}; + /// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server + /// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)" + std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {}; + + /// Internal methods + bool IsChannelNameAllowed(std::string channelName); + // Bool_t InitContainers(); + void CheckTsQueues(); + void PrepareTreeEntry(CbmEventTimeslice unpTs); + void DumpTreeEntry(); + bool SendHistograms(); + void Finish(); +}; + +#endif /* CBMDEVICEDIGIEVTSINK_H_ */ diff --git a/MQ/mcbm/UnpBuildSink_missing_features.txt b/MQ/mcbm/UnpBuildSink_missing_features.txt new file mode 100644 index 0000000000..70add5ccf5 --- /dev/null +++ b/MQ/mcbm/UnpBuildSink_missing_features.txt @@ -0,0 +1,8 @@ +High priority +- +++++ Switch Sampler-Unpackers connection from Push-Pull to Rep-Req! Highest priority for memory and load-balancing performances! See monitoring examples! +- Make the parameter server the single source for the CbmSetup object (remove all disk accesses in Unpacker device!) + +Low priority +- Pub-Sub Queue from Sink to intermediate devices (unpackers, Event builders, ?calibrators?, ...) to signal that last TS was received and dumped and Transition + `Active -> Ready -> Stop -> End` can be done (e.g. through a Finish method) +- Empty/custom TsEventHeader and event vector when dumping missing TS in Sink (vector maybe already OK) diff --git a/MQ/mcbm/runDigiEventSink.cxx b/MQ/mcbm/runDigiEventSink.cxx new file mode 100644 index 0000000000..bd3d5a6a7a --- /dev/null +++ b/MQ/mcbm/runDigiEventSink.cxx @@ -0,0 +1,38 @@ +/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +#include "CbmDeviceDigiEventSink.h" + +#include <iomanip> +#include <string> + +#include "runFairMQDevice.h" + +namespace bpo = boost::program_options; +using namespace std; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options()("StoreFullTs", bpo::value<bool>()->default_value(false), + "Store digis vectors with full TS in addition to selected events if true"); + options.add_options()("OutFileName", bpo::value<std::string>()->default_value("mcbm_digis_events.root"), + "Name (full or relative path) of the output .root file "); + options.add_options()("EvtNameIn", bpo::value<std::string>()->default_value("events"), + "MQ channel name for built events"); + options.add_options()("FillHistos", bpo::value<bool>()->default_value(false), + "Fill histograms and send them to histo server if true"); + options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"), + "MQ channel name for histos"); + options.add_options()("ChNameHistCfg", bpo::value<std::string>()->default_value("histo-conf"), + "MQ channel name for histos config"); + options.add_options()("ChNameCanvCfg", bpo::value<std::string>()->default_value("canvas-conf"), + "MQ channel name for canvases config"); + options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS"); + options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0), + "Minimal time between two publishing"); + options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0), + "Maximal time between two publishing"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceDigiEventSink(); } diff --git a/MQ/mcbm/startBuildRawEvents2021.sh.in b/MQ/mcbm/startBuildRawEvents2021.sh.in index 9fd073d254..3718ef3d7e 100755 --- a/MQ/mcbm/startBuildRawEvents2021.sh.in +++ b/MQ/mcbm/startBuildRawEvents2021.sh.in @@ -174,15 +174,15 @@ while (( _iMoni < _nbmoni )); do EVTBUILDER+=" --PubTimeMax $_pubmaxsec" EVTBUILDER+=" --FillHistos true" EVTBUILDER+=" --IgnTsOver false" - EVTBUILDER+=" --EvtOverMode NoOverlap" + EVTBUILDER+=" --EvtOverMode AllowOverlap" EVTBUILDER+=" --RefDet kTof" EVTBUILDER+=" --DelDet kT0" EVTBUILDER+=" --DelDet kMuch" - EVTBUILDER+=" --SetTrigWin kSts,-50,100" - EVTBUILDER+=" --SetTrigWin kTrd,-250,100" - EVTBUILDER+=" --SetTrigWin kTof,-1,100" # To get T0 Digis (seed + close-by digis) in the event - EVTBUILDER+=" --SetTrigWin kRich,-150,20" - EVTBUILDER+=" --SetTrigWin kPsd,-50,10" + EVTBUILDER+=" --SetTrigWin kSts,-100,100" + EVTBUILDER+=" --SetTrigWin kTrd,-250,250" + EVTBUILDER+=" --SetTrigWin kTof,-150,150" # To get T0 Digis (seed + close-by digis) in the event + EVTBUILDER+=" --SetTrigWin kRich,-100,100" + EVTBUILDER+=" --SetTrigWin kPsd,-100,100" EVTBUILDER+=" --SetTrigMinNb kSts,0" EVTBUILDER+=" --SetTrigMinNb kTrd,0" EVTBUILDER+=" --SetTrigMinNb kTof,6" @@ -212,9 +212,10 @@ while (( _iMoni < _nbmoni )); do done -EVTSINK="McbmEventSink" +EVTSINK="DigiEventSink" EVTSINK+=" --id evtsink1" EVTSINK+=" --severity info" +#EVTSINK+=" --StoreFullTs 1" EVTSINK+=" --OutFileName mcbm_digis_events.root" EVTSINK+=" --FillHistos false" EVTSINK+=" --PubFreqTs $_pubfreqts" diff --git a/MQ/source/CbmMQTsaMultiSampler.cxx b/MQ/source/CbmMQTsaMultiSampler.cxx index 3564ccfe3f..e8a9eeb877 100644 --- a/MQ/source/CbmMQTsaMultiSampler.cxx +++ b/MQ/source/CbmMQTsaMultiSampler.cxx @@ -478,7 +478,7 @@ bool CbmMQTsaMultiSampler::ConditionalRun() } // if( 0 < fuPublishFreqTs ) /// Missed TS detection (only if output channel name defined by user) - if ((uTsIndex != (fuPrevTsIndex + 1)) && (0 != fuPrevTsIndex && 0 != uTsIndex)) { + if ((uTsIndex != (fuPrevTsIndex + 1)) && !(0 == fuPrevTsIndex && 0 == uTsIndex)) { LOG(info) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex << " New TS Index is " << uTsIndex << " diff is " << uTsIndex - fuPrevTsIndex << " Missing are " << uTsIndex - fuPrevTsIndex - 1; @@ -505,7 +505,7 @@ bool CbmMQTsaMultiSampler::ConditionalRun() fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fuPrevTsIndex - 1); } // if( 0 < fuPublishFreqTs ) - } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && ( 0 != fuPrevTsIndex && 0 != uTsIndex ) ) + } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && !( 0 == fuPrevTsIndex && 0 == uTsIndex ) ) if (0 < fuPublishFreqTs) { fhMissedTS->Fill(0); -- GitLab