/* Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt SPDX-License-Identifier: GPL-3.0-only Authors: Pierre-Alain Loizeau [committer] */ /** * CbmDeviceMcbmMonitorPulser.cxx * * @since 2020-05-04 * @author P.-A Loizeau */ #include "CbmDeviceMcbmMonitorPulser.h" #include "CbmMQDefs.h" #include "TimesliceMetaData.h" //#include "CbmMcbm2018MonitorAlgoTof.h" #include "CbmFlesCanvasTools.h" #include "StorableTimeslice.hpp" #include "FairMQLogger.h" #include "FairMQProgOptions.h" // device->fConfig #include "FairParGenericSet.h" #include "TCanvas.h" #include "TFile.h" #include "TH1.h" #include "TList.h" #include "TNamed.h" #include "BoostSerializer.h" #include <boost/archive/binary_iarchive.hpp> #include <boost/serialization/utility.hpp> #include <array> #include <iomanip> #include <stdexcept> #include <string> #include "RootSerializer.h" struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; }; using namespace std; //Bool_t bMcbm2018MonitorTaskTofResetHistos = kFALSE; CbmDeviceMcbmMonitorPulser::CbmDeviceMcbmMonitorPulser() // : fMonitorAlgo{ new CbmMcbm2018MonitorAlgoTof() } { } void CbmDeviceMcbmMonitorPulser::InitTask() try { /// Read options from executable LOG(info) << "Init options for CbmMqStarHistoServer."; fbDebugMonitorMode = fConfig->GetValue<bool>("DebugMoni"); fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz"); fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin"); fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax"); fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg"); fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg"); fsAllowedChannels[0] = fsChannelNameDataInput; LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs; LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime; LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime; /// Set the Monitor Algo in Absolute time scale // fMonitorAlgo->UseAbsoluteTime(); // Get the information about created channels from the device // Check if the defined channels from the topology (by name) // are in the list of channels which are possible/allowed // for the device // The idea is to check at initilization if the devices are // properly connected. For the time beeing this is done with a // nameing convention. It is not avoided that someone sends other // data on this channel. //logger::SetLogLevel("INFO"); int noChannel = fChannels.size(); LOG(info) << "Number of defined channels: " << noChannel; for (auto const& entry : fChannels) { LOG(info) << "Channel name: " << entry.first; if (std::string::npos != entry.first.find(fsChannelNameDataInput)) { if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); OnData(entry.first, &CbmDeviceMcbmMonitorPulser::HandleData); } // if( std::string::npos != entry.first.find( fsChannelNameDataInput ) ) } // for( auto const &entry : fChannels ) InitContainers(); } catch (InitTaskError& e) { LOG(error) << e.what(); // Wrapper defined in CbmMQDefs.h to support different FairMQ versions cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); } bool CbmDeviceMcbmMonitorPulser::IsChannelNameAllowed(std::string channelName) { for (auto const& entry : fsAllowedChannels) { std::size_t pos1 = channelName.find(entry); if (pos1 != std::string::npos) { const vector<std::string>::const_iterator pos = std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry); const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin(); LOG(info) << "Found " << entry << " in " << channelName; LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx; return true; } // if (pos1!=std::string::npos) } // for(auto const &entry : fsAllowedChannels) LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; LOG(error) << "Stop device."; return false; } Bool_t CbmDeviceMcbmMonitorPulser::InitContainers() { LOG(info) << "Init parameter containers for CbmDeviceMcbmMonitorPulser."; Bool_t initOK = kTRUE; /* fParCList = fMonitorAlgo->GetParList(); for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) { FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) ); fParCList->Remove( tempObj ); std::string paramName{ tempObj->GetName() }; // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy // Her must come the proper Runid std::string message = paramName + ",111"; LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message; FairMQMessagePtr req( NewSimpleMessage(message) ); FairMQMessagePtr rep( NewMessage() ); FairParGenericSet* newObj = nullptr; if ( Send(req, "parameters") > 0 ) { if ( Receive( rep, "parameters" ) >= 0) { if ( rep->GetSize() != 0 ) { CbmMqTMessage tmsg( rep->GetData(), rep->GetSize() ); newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) ); LOG( info ) << "Received unpack parameter from the server:"; newObj->print(); } else { LOG( error ) << "Received empty reply. Parameter not available"; } // if (rep->GetSize() != 0) } // if (Receive(rep, "parameters") >= 0) } // if (Send(req, "parameters") > 0) fParCList->AddAt( newObj, iparC ); delete tempObj; } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) /// Need to add accessors for all options fMonitorAlgo->SetIgnoreOverlapMs( fbIgnoreOverlapMs ); fMonitorAlgo->SetDebugMonitorMode( fbDebugMonitorMode ); fMonitorAlgo->SetIgnoreCriticalErrors( fbIgnoreCriticalErrors ); fMonitorAlgo->SetHistoryHistoSize( fuHistoryHistoSize ); fMonitorAlgo->SetPulserTotLimits( fuMinTotPulser, fuMaxTotPulser ); Bool_t initOK = fMonitorAlgo->InitContainers(); */ // Bool_t initOK = fMonitorAlgo->ReInitContainers(); // CreateHistos(); /* /// Histos creation and obtain pointer on them /// Trigger histo creation on all associated algos initOK &= fMonitorAlgo->CreateHistograms(); /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder) std::vector< std::pair< TNamed *, std::string > > vHistos = fMonitorAlgo->GetHistoVector(); /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder) std::vector< std::pair< TCanvas *, std::string > > vCanvases = fMonitorAlgo->GetCanvasVector(); /// Add pointers to each histo in the histo array /// Create histo config vector /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > /// and send it through a separate channel using the BoostSerializer for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) { // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() // << " in " << vHistos[ uHisto ].second.data() // ; fArrayHisto.Add( vHistos[ uHisto ].first ); std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(), vHistos[ uHisto ].second ); fvpsHistosFolder.push_back( psHistoConfig ); /// Serialize the vector of histo config into a single MQ message FairMQMessagePtr messageHist( NewMessage() ); // Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig ); BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,psHistoConfig); /// Send message to the common histogram config messages queue if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) { LOG(error) << "Problem sending histo config"; return false; } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data() ; } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) /// Create canvas config vector /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > /// and send it through a separate channel using the BoostSerializer for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) { // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() // << " in " << vCanvases[ uCanv ].second.data(); std::string sCanvName = (vCanvases[ uCanv ].first)->GetName(); std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first ); std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf ); fvpsCanvasConfig.push_back( psCanvConfig ); /// Serialize the vector of canvas config into a single MQ message FairMQMessagePtr messageCan( NewMessage() ); // Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig ); BoostSerializer < std::pair< std::string, std::string > >().Serialize( *messageCan, psCanvConfig ); /// Send message to the common canvas config messages queue if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) { LOG(error) << "Problem sending canvas config"; return false; } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data() ; } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) */ return initOK; } // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) bool CbmDeviceMcbmMonitorPulser::HandleData(FairMQParts& parts, int /*index*/) { fulNumMessages++; LOG(debug) << "Received message " << fulNumMessages << " with " << parts.Size() << " parts" << ", size0: " << parts.At(0)->GetSize(); uint32_t uPartIdx = 0; /// TODO: code order of vectors in the TS MetaData!! /* std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ), ( parts.At( uPartIdx ) )->GetSize() ); std::istringstream issTsMeta(msgStrTsMeta); boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta); inputArchiveTsMeta >> (*fTsMetaData); ++uPartIdx; */ // Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData); RootSerializer().Deserialize(*parts.At(uPartIdx), fTsMetaData); ++uPartIdx; std::string msgStrBmon(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); std::istringstream issBmon(msgStrBmon); boost::archive::binary_iarchive inputArchiveBmon(issBmon); inputArchiveBmon >> fvDigiBmon; ++uPartIdx; std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); std::istringstream issSts(msgStrSts); boost::archive::binary_iarchive inputArchiveSts(issSts); inputArchiveSts >> fvDigiSts; ++uPartIdx; std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); std::istringstream issMuch(msgStrMuch); boost::archive::binary_iarchive inputArchiveMuch(issMuch); inputArchiveMuch >> fvDigiMuch; ++uPartIdx; std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); std::istringstream issTrd(msgStrTrd); boost::archive::binary_iarchive inputArchiveTrd(issTrd); inputArchiveTrd >> fvDigiTrd; ++uPartIdx; std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); std::istringstream issTof(msgStrTof); boost::archive::binary_iarchive inputArchiveTof(issTof); inputArchiveTof >> fvDigiTof; ++uPartIdx; std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); std::istringstream issRich(msgStrRich); boost::archive::binary_iarchive inputArchiveRich(issRich); inputArchiveRich >> fvDigiRich; ++uPartIdx; std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); std::istringstream issPsd(msgStrPsd); boost::archive::binary_iarchive inputArchivePsd(issPsd); inputArchivePsd >> fvDigiPsd; ++uPartIdx; /// Process data in Algo /// Clear vectors delete fTsMetaData; fvDigiBmon.clear(); fvDigiSts.clear(); fvDigiMuch.clear(); fvDigiTrd.clear(); fvDigiTof.clear(); fvDigiRich.clear(); fvDigiPsd.clear(); /* LOG(debug) << "Received message number "<< fulNumMessages << " with size " << msg->GetSize(); if( 0 == fulNumMessages % 10000 ) LOG(info) << "Received " << fulNumMessages << " messages"; std::string msgStr( static_cast<char*>( msg->GetData() ), msg->GetSize() ); std::istringstream iss( msgStr ); boost::archive::binary_iarchive inputArchive( iss ); /// Create an empty TS and fill it with the incoming message fles::StorableTimeslice component{ 0 }; inputArchive >> component; /// Process the Timeslice DoUnpack(component, 0); /// Send histograms each 100 time slices. Should be each ~1s /// Use also runtime checker to trigger sending after M s if /// processing too slow or delay sending if processing too fast std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) { SendHistograms(); fLastPublishTime = std::chrono::system_clock::now(); } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) */ return true; } bool CbmDeviceMcbmMonitorPulser::SendHistograms() { /// Serialize the array of histos into a single MQ message FairMQMessagePtr message(NewMessage()); // Serialize<RootSerializer>(*message, &fArrayHisto); RootSerializer().Serialize(*message, &fArrayHisto); // test code to check if deserialization works /* TObject* tempObject = nullptr; // Deserialize<RootDeserializer>(*message, tempObject); RootDeserializer().Deserialize(*message, tempObject); if (TString(tempObject->ClassName()).EqualTo("TObjArray")) { TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject); LOG(info) << "Array contains " << arrayHisto->GetEntriesFast() << " entries"; for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) { TObject* obj = arrayHisto->At(i); LOG(info) << obj->GetName(); TH1* histogram = static_cast<TH1*>(obj); LOG(info) << histogram->GetNbinsX(); } } */ /// Send message to the common histogram messages queue if (Send(message, fsChannelNameHistosInput) < 0) { LOG(error) << "Problem sending data"; return false; } // if( Send( message, fsChannelNameHistosInput ) < 0 ) /// Reset the histograms after sending them (but do not reset the time) // fMonitorAlgo->ResetHistograms( kFALSE ); return true; } CbmDeviceMcbmMonitorPulser::~CbmDeviceMcbmMonitorPulser() {} void CbmDeviceMcbmMonitorPulser::Finish() {}