/* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt SPDX-License-Identifier: GPL-3.0-only Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */ /** * CbmMQTsaMultiSampler.cpp * * @since 2017-11-17 * @author F. Uhlig */ #include "CbmMQTsaMultiSampler.h" #include "CbmFlesCanvasTools.h" #include "CbmFormatDecHexPrintout.h" #include "TimesliceInputArchive.hpp" #include "TimesliceMultiInputArchive.hpp" #include "TimesliceMultiSubscriber.hpp" #include "TimesliceSubscriber.hpp" #include "FairMQLogger.h" #include "FairMQProgOptions.h" // device->fConfig #include <TCanvas.h> #include <TH1F.h> #include <TH1I.h> #include <TProfile.h> #include "BoostSerializer.h" #include <boost/algorithm/string.hpp> #include <boost/archive/binary_oarchive.hpp> #include <boost/filesystem.hpp> #include <boost/regex.hpp> #include <boost/serialization/utility.hpp> #include "RootSerializer.h" namespace filesys = boost::filesystem; #include <thread> // this_thread::sleep_for #include <algorithm> #include <chrono> #include <ctime> #include <iomanip> #include <sstream> #include <string> #include <stdio.h> using namespace std; #include <stdexcept> struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; }; CbmMQTsaMultiSampler::CbmMQTsaMultiSampler() : FairMQDevice() , fMaxTimeslices(0) , fFileName("") , fDirName("") , fInputFileList() , fFileCounter(0) , fHost("") , fPort(0) , fHighWaterMark(1) , fTSCounter(0) , fMessageCounter(0) , fSource(nullptr) , fTime() , fLastPublishTime {std::chrono::system_clock::now()} { } void CbmMQTsaMultiSampler::InitTask() try { // Get the values from the command line options (via fConfig) fFileName = fConfig->GetValue<string>("filename"); fDirName = fConfig->GetValue<string>("dirname"); fHost = fConfig->GetValue<string>("flib-host"); fPort = fConfig->GetValue<uint64_t>("flib-port"); fHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark"); fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices"); fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts"); fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid"); fbSendTsPerChannel = fConfig->GetValue<bool>("send-ts-per-channel"); fsChannelNameMissedTs = fConfig->GetValue<std::string>("ChNameMissTs"); fsChannelNameCommands = fConfig->GetValue<std::string>("ChNameCmds"); fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin"); fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg"); fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg"); if (fbNoSplitTs) { if (fbSendTsPerSysId) { if (fbSendTsPerChannel) { LOG(warning) << "Both no-split-ts, send-ts-per-sysid and " "send-ts-per-channel options used => " << " second and third one will be ignored!!!!"; } // if( fbSendTsPerSysId ) else LOG(warning) << "Both no-split-ts and send-ts-per-sysid options used => " << " second one will be ignored!!!!"; } // if( fbSendTsPerSysId ) else if (fbSendTsPerChannel) { LOG(warning) << "Both no-split-ts and send-ts-per-channel options used => " << " second one will be ignored!!!!"; } // else if( fbSendTsPerSysId ) of if( fbSendTsPerSysId ) } // if( fbNoSplitTs ) else if (fbSendTsPerSysId && fbSendTsPerChannel) { LOG(warning) << "Both send-ts-per-sysid and send-ts-per-channel options used => " << " second one will be ignored!!!!"; } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs ) /// Extract SysId and channel information if provided in the binary options std::vector<std::string> vSysIdChanPairs = fConfig->GetValue<std::vector<std::string>>("sysid-chan"); for (uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair) { const size_t sep = vSysIdChanPairs[uPair].find(':'); if (string::npos == sep || 0 == sep || vSysIdChanPairs[uPair].size() == sep) { LOG(info) << vSysIdChanPairs[uPair]; throw InitTaskError("Provided pair of SysId + Channel name is missing a : or an argument."); } // if( string::npos == sep || 0 == sep || vSysIdChanPairs[ uPair ].size() == sep ) /// Extract SysId std::string sSysId = vSysIdChanPairs[uPair].substr(0, sep); const size_t hexPos = sSysId.find("0x"); int iSysId; if (string::npos == hexPos) iSysId = std::stoi(sSysId); else iSysId = std::stoi(sSysId.substr(hexPos + 2), nullptr, 16); /// Extract Channel name std::string sChannelName = vSysIdChanPairs[uPair].substr(sep + 1); /// Look if SysId is already defined const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId); if (fSysId.end() != pos) { /// SysId already there, redefine the corresponding channel name const vector<std::string>::size_type idx = pos - fSysId.begin(); fAllowedChannels[idx] = sChannelName; } // if( fSysId.end() != pos ) else { /// SysId unknown yet, add both SysId and channe name at end of respective vectors fSysId.push_back(iSysId); fAllowedChannels.push_back(sChannelName); } // else of if( fSysId.end() != pos ) LOG(info) << vSysIdChanPairs[uPair] << " " << iSysId << " " << sChannelName; } // for( uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair ) if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX; // Check which input is defined // Posibilities // filename && ! dirname : single file // filename with wildcards && diranme : all files with filename regex in the directory // host && port : connect to the flim server bool isGoodInputCombi {false}; if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) { isGoodInputCombi = true; fInputFileList.push_back(fFileName); } else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) { isGoodInputCombi = true; fInputFileList.push_back(fFileName); } else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) { isGoodInputCombi = true; LOG(info) << "Host: " << fHost; LOG(info) << "Port: " << fPort; } else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 == fPort) { isGoodInputCombi = true; LOG(info) << "Host string: " << fHost; } else { isGoodInputCombi = false; } if (!isGoodInputCombi) { throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory " "or host + port are allowed combination."); } LOG(info) << "MaxTimeslices: " << fMaxTimeslices; // Get the information about created channels from the device // Check if the defined channels from the topology (by name) // are in the list of channels which are possible/allowed // for the device // The idea is to check at initilization if the devices are // properly connected. For the time beeing this is done with a // nameing convention. It is not avoided that someone sends other // data on this channel. int noChannel = fChannels.size(); LOG(info) << "Number of defined output channels: " << noChannel; for (auto const& entry : fChannels) { /// Catches and ignores the channels for missing TS indices and commands /// Same for the histogram channels if (entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands || (0 < fuPublishFreqTs && (entry.first == fsChannelNameHistosInput || entry.first == fsChannelNameHistosConfig || entry.first == fsChannelNameCanvasConfig))) { continue; } // if( entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands || histo channels name) LOG(info) << "Channel name: " << entry.first; if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match."); } for (auto const& value : fComponentsToSend) { LOG(info) << "Value : " << value; if (value > 1) { throw InitTaskError("Sending same data to more than one output channel " "not implemented yet."); } } if (0 == fFileName.size() && 0 != fHost.size() && 0 != fPort) { // Don't add the protocol since this is done now in the TimesliceMultiSubscriber //std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort); std::string connector = fHost + ":" + std::to_string(fPort); LOG(info) << "Open TSPublisher at " << connector; fSource = new fles::TimesliceMultiSubscriber(connector); } else if (0 == fFileName.size() && 0 != fHost.size()) { std::string connector = fHost; LOG(info) << "Open TSPublisher with host string: " << connector; fSource = new fles::TimesliceMultiSubscriber(connector, fHighWaterMark); } else { // Create a ";" separated string with all file names std::string fileList {""}; for (const auto& obj : fInputFileList) { std::string fileName = obj; fileList += fileName; fileList += ";"; } fileList.pop_back(); // Remove the last ; LOG(info) << "Input File String: " << fileList; fSource = new fles::TimesliceMultiInputArchive(fileList, fDirName); if (!fSource) { throw InitTaskError("Could open files from file list."); } } LOG(info) << "High-Water Mark: " << fHighWaterMark; LOG(info) << "Max. Timeslices: " << fMaxTimeslices; if (fbNoSplitTs) { LOG(info) << "Sending TS copies in no-split mode"; } // if( fbNoSplitTs ) else if (fbSendTsPerSysId) { LOG(info) << "Sending components in separate TS per SysId"; } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs else if (fbSendTsPerChannel) { LOG(info) << "Sending components in separate TS per channel"; } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs ) fTime = std::chrono::steady_clock::now(); } catch (InitTaskError& e) { LOG(error) << e.what(); ChangeState(fair::mq::Transition::ErrorFound); } bool CbmMQTsaMultiSampler::IsChannelNameAllowed(std::string channelName) { /// If sending full TS, accept any name! if (fbNoSplitTs) { fComponentsToSend[0]++; fChannelsToSend[0].push_back(channelName); return true; } // if( fbNoSplitTs ) bool bFoundMatch = false; // for(auto const &entry : fAllowedChannels) { for (uint32_t idx = 0; idx < fAllowedChannels.size(); ++idx) { auto const& entry = fAllowedChannels[idx]; LOG(info) << "Looking for name " << channelName << " in " << entry; std::size_t pos1 = channelName.find(entry); if (pos1 != std::string::npos) { /* const vector<std::string>::const_iterator pos = std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry); const vector<std::string>::size_type idx = pos-fAllowedChannels.begin(); */ LOG(info) << "Found " << entry << " in " << channelName; LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx << " (SysId 0x" << std::hex << fSysId[idx] << std::dec << ")"; fComponentsToSend[idx]++; fChannelsToSend[idx].push_back(channelName); /// If sending per channel, do not stop the loop as we allow more than 1 comp type per channel if (fbSendTsPerChannel) bFoundMatch = true; else return true; } // if (pos1!=std::string::npos) } /// If sending per channel, do not stop the loop but still check if at least 1 match found if (fbSendTsPerChannel && bFoundMatch) return true; LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names."; LOG(error) << "Stop device."; return false; } bool CbmMQTsaMultiSampler::InitHistograms() { LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs; LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime; LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime; /// Vector of pointers on each histo (+ optionally desired folder) std::vector<std::pair<TNamed*, std::string>> vHistos = {}; /// Vector of pointers on each canvas (+ optionally desired folder) std::vector<std::pair<TCanvas*, std::string>> vCanvases = {}; /// Histos creation and obtain pointer on them fhTsRate = new TH1I("TsRate", "TS rate; t [s]", 1800, 0., 1800.); fhTsSize = new TH1I("TsSize", "Size of TS; Size [MB]", 15000, 0., 15000.); fhTsSizeEvo = new TProfile("TsSizeEvo", "Evolution of the TS Size; t [s]; Mean size [MB]", 1800, 0., 1800.); fhTsMaxSizeEvo = new TH1F("TsMaxSizeEvo", "Evolution of maximal TS Size; t [s]; Max size [MB]", 1800, 0., 1800.); fhMissedTS = new TH1I("Missed_TS", "Missed TS", 2, -0.5, 1.5); fhMissedTSEvo = new TProfile("Missed_TS_Evo", "Missed TS evolution; t [s]", 1800, 0., 1800.); /// Add histo pointers to the histo vector vHistos.push_back(std::pair<TNamed*, std::string>(fhTsRate, "Sampler")); vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSize, "Sampler")); vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSizeEvo, "Sampler")); vHistos.push_back(std::pair<TNamed*, std::string>(fhTsMaxSizeEvo, "Sampler")); vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTS, "Sampler")); vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTSEvo, "Sampler")); /// Canvases creation Double_t w = 10; Double_t h = 10; fcSummary = new TCanvas("cSampSummary", "Sampler monitoring plots", w, h); fcSummary->Divide(2, 3); fcSummary->cd(1); gPad->SetGridx(); gPad->SetGridy(); fhTsRate->Draw("hist"); fcSummary->cd(2); gPad->SetGridx(); gPad->SetGridy(); gPad->SetLogx(); gPad->SetLogy(); fhTsSize->Draw("hist"); fcSummary->cd(3); gPad->SetGridx(); gPad->SetGridy(); fhTsSizeEvo->Draw("hist"); fcSummary->cd(4); gPad->SetGridx(); gPad->SetGridy(); fhTsMaxSizeEvo->Draw("hist"); fcSummary->cd(5); gPad->SetGridx(); gPad->SetGridy(); fhMissedTS->Draw("hist"); fcSummary->cd(6); gPad->SetGridx(); gPad->SetGridy(); fhMissedTSEvo->Draw("el"); /// Add canvas pointers to the canvas vector vCanvases.push_back(std::pair<TCanvas*, std::string>(fcSummary, "canvases")); /// Add pointers to each histo in the histo array /// Create histo config vector /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder > /// and send it through a separate channel using the BoostSerializer for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) { // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName() // << " in " << vHistos[ uHisto ].second.data() // ; fArrayHisto.Add(vHistos[uHisto].first); std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second); fvpsHistosFolder.push_back(psHistoConfig); /// Serialize the vector of histo config into a single MQ message FairMQMessagePtr messageHist(NewMessage()); // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, psHistoConfig); BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig); /// Send message to the common histogram config messages queue if (Send(messageHist, fsChannelNameHistosConfig) < 0) { LOG(fatal) << "Problem sending histo config"; } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 ) LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data(); } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto ) /// Create canvas config vector /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config > /// and send it through a separate channel using the BoostSerializer for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) { // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName() // << " in " << vCanvases[ uCanv ].second.data(); std::string sCanvName = (vCanvases[uCanv].first)->GetName(); std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first); std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf); fvpsCanvasConfig.push_back(psCanvConfig); /// Serialize the vector of canvas config into a single MQ message FairMQMessagePtr messageCan(NewMessage()); // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, psCanvConfig); BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig); /// Send message to the common canvas config messages queue if (Send(messageCan, fsChannelNameCanvasConfig) < 0) { LOG(fatal) << "Problem sending canvas config"; } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 ) LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data(); } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv ) return true; } bool CbmMQTsaMultiSampler::ConditionalRun() { if (0 < fuPublishFreqTs && 0 == fTSCounter) { InitHistograms(); } // if( 0 < fuPublishFreqTs ) /// initialize the source (connect to emitter, ...) if (0 == fTSCounter && nullptr != dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)) { dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)->InitTimesliceSubscriber(); } // if( 0 == fTSCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber >(fSource) ) auto timeslice = fSource->get(); if (timeslice) { if (fTSCounter < fMaxTimeslices) { fTSCounter++; const fles::Timeslice& ts = *timeslice; uint64_t uTsIndex = ts.index(); if (0 < fuPublishFreqTs) { uint64_t uTsTime = ts.descriptor(0, 0).idx; if (0 == fuStartTime) { fuStartTime = uTsTime; } // if( 0 == fuStartTime ) fdTimeToStart = static_cast<double_t>(uTsTime - fuStartTime) / 1e9; uint64_t uSizeMb = 0; for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) { uSizeMb += ts.size_component(uComp) / (1024 * 1024); } // for( uint_t uComp = 0; uComp < ts.num_components(); ++uComp ) fhTsRate->Fill(fdTimeToStart); fhTsSize->Fill(uSizeMb); fhTsSizeEvo->Fill(fdTimeToStart, uSizeMb); /// Fill max size per s (assumes the histo binning is 1 second!) if (0. == fdLastMaxTime) { fdLastMaxTime = fdTimeToStart; fdTsMaxSize = uSizeMb; } // if( 0. == fdLastMaxTime ) else if (1. <= fdTimeToStart - fdLastMaxTime) { fhTsMaxSizeEvo->Fill(fdLastMaxTime, fdTsMaxSize); fdLastMaxTime = fdTimeToStart; fdTsMaxSize = uSizeMb; } // else if if( 1 <= fdTimeToStart - fdLastMaxTime ) else if (fdTsMaxSize < uSizeMb) { fdTsMaxSize = uSizeMb; } // else if( fdTsMaxSize < uSizeMb ) } // if( 0 < fuPublishFreqTs ) /// Missed TS detection (only if output channel name defined by user) if ((uTsIndex != (fuPrevTsIndex + 1)) && !(0 == fuPrevTsIndex && 0 == uTsIndex)) { LOG(info) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex << " New TS Index is " << uTsIndex << " diff is " << uTsIndex - fuPrevTsIndex << " Missing are " << uTsIndex - fuPrevTsIndex - 1; if ("" != fsChannelNameMissedTs) { /// Add missing TS indices to a vector and send it in appropriate channel std::vector<uint64_t> vulMissedIndices; for (uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) { vulMissedIndices.emplace_back(ulMiss); } // for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss ) if (!SendMissedTsIdx(vulMissedIndices)) { /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending a STOP to let all slaves finish processing previous data std::this_thread::sleep_for(std::chrono::milliseconds(1000)); SendCommand("STOP"); } // if( "" != fsChannelNameCommands ) return false; } // if( !SendMissedTsIdx( vulMissedIndices ) ) } // if( "" != fsChannelNameMissedTs ) if (0 < fuPublishFreqTs) { fhMissedTS->Fill(1, uTsIndex - fuPrevTsIndex - 1); fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fuPrevTsIndex - 1); } // if( 0 < fuPublishFreqTs ) } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && !( 0 == fuPrevTsIndex && 0 == uTsIndex ) ) if (0 < fuPublishFreqTs) { fhMissedTS->Fill(0); fhMissedTSEvo->Fill(fdTimeToStart, 0, 1); } // else if( 0 < fuPublishFreqTs ) fuPrevTsIndex = uTsIndex; if (fTSCounter % 10000 == 0) { LOG(info) << "Received TS " << fTSCounter << " with index " << uTsIndex; } LOG(debug) << "Found " << ts.num_components() << " different components in timeslice"; // CheckTimeslice(ts); if (fbNoSplitTs) { /// This is a special case for the TOF + Bmon /// => Inefficient as copy the TS as many times as need! if (!CreateAndSendFullTs(ts)) { /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending a STOP to let all slaves finish processing previous data std::this_thread::sleep_for(std::chrono::milliseconds(1000)); SendCommand("STOP"); } // if( "" != fsChannelNameCommands ) return false; } // if( !CreateAndSendFullTs( ts ) ) } // if( fbNoSplitTs ) else if (fbSendTsPerSysId) { /// This assumes that the order of the components does NOT change after the first TS /// That should be the case as the component index correspond to a physical link idx if (!CreateAndCombineComponentsPerSysId(ts)) { /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending a STOP to let all slaves finish processing previous data std::this_thread::sleep_for(std::chrono::milliseconds(1000)); SendCommand("STOP"); } // if( "" != fsChannelNameCommands ) return false; } // if( !CreateAndCombineComponentsPerSysId( ts ) ) } // else if( fbSendTsPerSysId ) of if( fbNoSplitTs ) else if (fbSendTsPerChannel) { /// This assumes that the order of the components does NOT change after the first TS /// That should be the case as the component index correspond to a physical link idx if (!CreateAndCombineComponentsPerChannel(ts)) { /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending a STOP to let all slaves finish processing previous data std::this_thread::sleep_for(std::chrono::milliseconds(1000)); SendCommand("STOP"); } // if( "" != fsChannelNameCommands ) return false; } // if( !CreateAndCombineComponentsPerChannel( ts ) ) } // else if( fbSendTsPerChannel ) of if( fbSendTsPerSysId ) else { for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) { if (!CreateAndSendComponent(ts, nrComp)) { /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending a STOP to let all slaves finish processing previous data std::this_thread::sleep_for(std::chrono::milliseconds(1000)); SendCommand("STOP"); } // if( "" != fsChannelNameCommands ) return false; } // if( !CreateAndSendComponent(ts, nrComp) ) } // for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) } // else of if( fbSendTsPerSysId ) if (0 < fuPublishFreqTs) { /// Send histograms periodically. /// Use also runtime checker to trigger sending after M s if /// processing too slow or delay sending if processing too fast std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now(); std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime; if ((fdMaxPublishTime < elapsedSeconds.count()) || (0 == fTSCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) { SendHistograms(); fLastPublishTime = std::chrono::system_clock::now(); } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fTSCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) ) } // if( 0 < fuPublishFreqTs ) return true; } // if (fTSCounter < fMaxTimeslices) else { CalcRuntime(); /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending an EOF to let all slaves finish processing previous data std::this_thread::sleep_for(std::chrono::seconds(10)); std::string sCmd = "EOF "; sCmd += FormatDecPrintout(fuPrevTsIndex); sCmd += " "; sCmd += FormatDecPrintout(fTSCounter); SendCommand(sCmd); } // if( "" != fsChannelNameCommands ) return false; } // else of if (fTSCounter < fMaxTimeslices) } // if (timeslice) else { CalcRuntime(); /// If command channel defined, send command to all "slaves" if ("" != fsChannelNameCommands) { /// Wait 1 s before sending an EOF to let all slaves finish processing previous data std::this_thread::sleep_for(std::chrono::seconds(10)); std::string sCmd = "EOF "; sCmd += FormatDecPrintout(fuPrevTsIndex); sCmd += " "; sCmd += FormatDecPrintout(fTSCounter); SendCommand(sCmd); } // if( "" != fsChannelNameCommands ) return false; } // else of if (timeslice) } bool CbmMQTsaMultiSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp) { // Check if component has to be send. If the corresponding channel // is connected create the new timeslice and send it to the // correct channel LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id); const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id)); if (pos != fSysId.end()) { const vector<std::string>::size_type idx = pos - fSysId.begin(); if (fComponentsToSend[idx] > 0) { LOG(debug) << "Create timeslice component for link " << nrComp; fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()}; component.append_component(ts.num_microslices(0)); for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) { component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m)); } /* LOG(info) << "Number of core microslices before: " << ts.num_core_microslices(); LOG(info) << "Number of core microslices after : " << component.num_core_microslices(); LOG(info) << "Number of microslices: " << component.num_microslices(0); */ if (!SendData(component, idx)) return false; return true; } } return true; } bool CbmMQTsaMultiSampler::CreateAndCombineComponentsPerSysId(const fles::Timeslice& ts) { /// First build the list of components for each SysId if it was not already done if (false == fbListCompPerSysIdReady) { for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) { uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id; const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usSysId); if (fSysId.end() != pos) { const vector<std::string>::size_type idx = pos - fSysId.begin(); fvvCompPerSysId[idx].push_back(uCompIdx); } // if( fSysId.end() != pos ) } // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp ) for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) { std::stringstream ss; ss << "Found " << std::setw(2) << fvvCompPerSysId[uSysIdx].size() << " components for SysId 0x" << std::hex << std::setw(2) << fSysId[uSysIdx] << std::dec << " :"; for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) { ss << " " << std::setw(3) << fvvCompPerSysId[uSysIdx][uComp]; } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp ) LOG(info) << ss.str(); } // for( uint32_t uSysId = 0; uSysId < fSysId.size(); ++uSysId ) fbListCompPerSysIdReady = true; } // if( false == fbListCompPerSysIdReady ) /// Then loop on all possible SysId and send TS with their respective components if needed for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) { if (0 < fComponentsToSend[uSysIdx]) { LOG(debug) << "Create timeslice with components for SysId " << std::hex << fSysId[uSysIdx] << std::dec; if (0 < fvvCompPerSysId[uSysIdx].size()) { fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()}; for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) { uint32_t uNumMsInComp = ts.num_microslices(fvvCompPerSysId[uSysIdx][uComp]); component.append_component(uNumMsInComp); LOG(debug) << "Add components to TS for SysId " << std::hex << fSysId[uSysIdx] << std::dec << " TS " << ts.index() << " Comp " << fvvCompPerSysId[uSysIdx][uComp]; for (size_t m = 0; m < uNumMsInComp; ++m) { component.append_microslice(uComp, m, ts.descriptor(fvvCompPerSysId[uSysIdx][uComp], m), ts.content(fvvCompPerSysId[uSysIdx][uComp], m)); } // for( size_t m = 0; m < uNumMsInComp; ++m ) } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp ) LOG(debug) << "Prepared timeslice for SysId " << std::hex << fSysId[uSysIdx] << std::dec << " with " << component.num_components() << " components"; if (!SendData(component, uSysIdx)) return false; } // if( 0 < fvvCompPerSysId[ uSysIdx ].size() ) } // if( 0 < fComponentsToSend[ uSysIdx ] ) } // for( uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx ) return true; } bool CbmMQTsaMultiSampler::CreateAndCombineComponentsPerChannel(const fles::Timeslice& ts) { /// First build the list of components for each channel name if it was not already done if (false == fbListCompPerChannelReady) { /// First add each channel enabled for sending to the list of channels we will use for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) { if (0 < fComponentsToSend[uSysIdx]) { for (uint32_t uChan = 0; uChan < fChannelsToSend[uSysIdx].size(); ++uChan) { const vector<std::string>::const_iterator pos = std::find(fvChannelsToSend.begin(), fvChannelsToSend.end(), fChannelsToSend[uSysIdx][uChan]); if (fvChannelsToSend.end() == pos) { fvChannelsToSend.push_back(fChannelsToSend[uSysIdx][uChan]); fvvCompPerChannel.push_back(std::vector<uint32_t>()); } } // for( uChan = 0; uChan < fChannelsToSend[ uSysIdx ].size(); ++ uChan ) } // if( 0 < fComponentsToSend[ uSysIdx ] ) } // for( uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx ) /// Now resize the vector in which we will store fo each sending channel the list of components fvvCompPerChannel.resize(fvChannelsToSend.size()); /// Check for each component if its system is enabled and if the name of its channel(s) is in the list /// If yes, add it to the vector of the corresponding channel for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) { uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id; const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usSysId); if (fSysId.end() != pos) { const vector<std::string>::size_type idxSys = pos - fSysId.begin(); if (0 < fComponentsToSend[idxSys]) { for (uint32_t uChan = 0; uChan < fChannelsToSend[idxSys].size(); ++uChan) { const vector<std::string>::const_iterator posCh = std::find(fvChannelsToSend.begin(), fvChannelsToSend.end(), fChannelsToSend[idxSys][uChan]); if (fvChannelsToSend.end() != posCh) { const vector<std::string>::size_type idxChan = posCh - fvChannelsToSend.begin(); fvvCompPerChannel[idxChan].push_back(uCompIdx); } // if( fvChannelsToSend.end() != posCh ) } // for( uChan = 0; uChan < fChannelsToSend[ idxSys ].size(); ++ uChan ) } // if( 0 < fComponentsToSend[ uSysIdx ] ) } // if( fSysId.end() != pos ) } // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp ) for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) { std::stringstream ss; ss << "Found " << std::setw(2) << fvvCompPerChannel[uChanIdx].size() << " components for channel " << fvChannelsToSend[uChanIdx] << " :"; for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size(); ++uComp) { ss << " " << std::setw(3) << fvvCompPerChannel[uChanIdx][uComp]; } // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp ) LOG(info) << ss.str(); } // for( uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx ) fbListCompPerChannelReady = true; } // if( false == fbListCompPerSysIdReady ) /// Loop on channels /// Loop on possible SysId and check channels /// Then loop on all possible channels and send TS with their respective components if needed for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) { LOG(debug) << "Create timeslice with components for channel " << fvChannelsToSend[uChanIdx]; if (0 < fvvCompPerChannel[uChanIdx].size()) { fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()}; for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size(); ++uComp) { uint32_t uNumMsInComp = ts.num_microslices(fvvCompPerChannel[uChanIdx][uComp]); component.append_component(uNumMsInComp); LOG(debug) << "Add components to TS for SysId " << std::hex << static_cast<uint16_t>(ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], 0).sys_id) << std::dec << " TS " << ts.index() << " Comp " << fvvCompPerChannel[uChanIdx][uComp]; for (size_t m = 0; m < uNumMsInComp; ++m) { component.append_microslice(uComp, m, ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], m), ts.content(fvvCompPerChannel[uChanIdx][uComp], m)); } // for( size_t m = 0; m < uNumMsInComp; ++m ) } // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp ) LOG(debug) << "Prepared timeslice for channel " << fvChannelsToSend[uChanIdx] << " with " << component.num_components() << " components"; if (!SendData(component, fvChannelsToSend[uChanIdx])) return false; } // if( 0 < fvvCompPerSysId[ uSysIdx ].size() ) } // for( uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx ) return true; } bool CbmMQTsaMultiSampler::CreateAndSendFullTs(const fles::Timeslice& ts) { /// Send full TS to all enabled channels for (uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx) { if (0 < fComponentsToSend[uChanIdx]) { LOG(debug) << "Copy timeslice component for channel " << fChannelsToSend[uChanIdx][0]; fles::StorableTimeslice fullTs {ts}; if (!SendData(fullTs, uChanIdx)) return false; } // if( 0 < fComponentsToSend[ uChanIdx ] ) } // for( uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx ) return true; } bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, int idx) { // serialize the timeslice and create the message std::stringstream oss; boost::archive::binary_oarchive oa(oss); oa << component; std::string* strMsg = new std::string(oss.str()); FairMQMessagePtr msg(NewMessage( const_cast<char*>(strMsg->c_str()), // data strMsg->length(), // size [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, strMsg)); // object that manages the data // TODO: Implement sending same data to more than one channel // Need to create new message (copy message??) if (fComponentsToSend[idx] > 1) { LOG(info) << "Need to copy FairMessage"; } // in case of error or transfer interruption, // return false to go to IDLE state // successfull transfer will return number of bytes // transfered (can be 0 if sending an empty message). LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0]; if (Send(msg, fChannelsToSend[idx][0]) < 0) { LOG(error) << "Problem sending data"; return false; } fMessageCounter++; LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize(); return true; } bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, std::string sChannel) { // serialize the timeslice and create the message std::stringstream oss; boost::archive::binary_oarchive oa(oss); oa << component; std::string* strMsg = new std::string(oss.str()); FairMQMessagePtr msg(NewMessage( const_cast<char*>(strMsg->c_str()), // data strMsg->length(), // size [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, strMsg)); // object that manages the data // in case of error or transfer interruption, // return false to go to IDLE state // successfull transfer will return number of bytes // transfered (can be 0 if sending an empty message). LOG(debug) << "Send data to channel " << sChannel; if (Send(msg, sChannel) < 0) { LOG(error) << "Problem sending data"; return false; } fMessageCounter++; LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize(); return true; } bool CbmMQTsaMultiSampler::SendMissedTsIdx(std::vector<uint64_t> vIndices) { // serialize the vector and create the message std::stringstream oss; boost::archive::binary_oarchive oa(oss); oa << vIndices; std::string* strMsg = new std::string(oss.str()); FairMQMessagePtr msg(NewMessage( const_cast<char*>(strMsg->c_str()), // data strMsg->length(), // size [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, strMsg)); // object that manages the data // in case of error or transfer interruption, // return false to go to IDLE state // successfull transfer will return number of bytes // transfered (can be 0 if sending an empty message). LOG(debug) << "Send data to channel " << fsChannelNameMissedTs; if (Send(msg, fsChannelNameMissedTs) < 0) { LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameMissedTs; return false; } // if( Send( msg, fsChannelNameMissedTs ) < 0 ) return true; } bool CbmMQTsaMultiSampler::SendCommand(std::string sCommand) { // serialize the vector and create the message std::stringstream oss; boost::archive::binary_oarchive oa(oss); oa << sCommand; std::string* strMsg = new std::string(oss.str()); FairMQMessagePtr msg(NewMessage( const_cast<char*>(strMsg->c_str()), // data strMsg->length(), // size [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, strMsg)); // object that manages the data // FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data // sCommand.length(), // size // []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); }, // &sCommand ) ); // object that manages the data // in case of error or transfer interruption, // return false to go to IDLE state // successfull transfer will return number of bytes // transfered (can be 0 if sending an empty message). LOG(debug) << "Send data to channel " << fsChannelNameCommands; if (Send(msg, fsChannelNameCommands) < 0) { LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameCommands; return false; } // if( Send( msg, fsChannelNameMissedTs ) < 0 ) return true; } bool CbmMQTsaMultiSampler::SendHistograms() { /// Serialize the array of histos into a single MQ message FairMQMessagePtr message(NewMessage()); // Serialize<RootSerializer>(*message, &fArrayHisto); RootSerializer().Serialize(*message, &fArrayHisto); /// Send message to the common histogram messages queue if (Send(message, fsChannelNameHistosInput) < 0) { LOG(error) << "Problem sending data"; return false; } // if( Send( message, fsChannelNameHistosInput ) < 0 ) /// Reset the histograms after sending them (but do not reset the time) ResetHistograms(); return true; } bool CbmMQTsaMultiSampler::ResetHistograms() { fhTsRate->Reset(); fhTsSize->Reset(); fhTsSizeEvo->Reset(); fhTsMaxSizeEvo->Reset(); fhMissedTS->Reset(); fhMissedTSEvo->Reset(); return true; } CbmMQTsaMultiSampler::~CbmMQTsaMultiSampler() {} void CbmMQTsaMultiSampler::CalcRuntime() { std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime; LOG(info) << "Runtime: " << run_time.count(); LOG(info) << "No more input data"; } void CbmMQTsaMultiSampler::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc) { LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec; LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec; LOG(info) << "Equipement ID: " << mdsc.eq_id; LOG(info) << "Flags: " << mdsc.flags; LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec; LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec; LOG(info) << "Microslice Idx: " << mdsc.idx; LOG(info) << "Checksum: " << mdsc.crc; LOG(info) << "Size: " << mdsc.size; LOG(info) << "Offset: " << mdsc.offset; } bool CbmMQTsaMultiSampler::CheckTimeslice(const fles::Timeslice& ts) { if (0 == ts.num_components()) { LOG(error) << "No Component in TS " << ts.index(); return 1; } LOG(info) << "Found " << ts.num_components() << " different components in timeslice"; for (size_t c = 0; c < ts.num_components(); ++c) { LOG(info) << "Found " << ts.num_microslices(c) << " microslices in component " << c; LOG(info) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes"; LOG(info) << "Component " << c << " has the system id 0x" << std::hex << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec; LOG(info) << "Component " << c << " has the system id 0x" << static_cast<int>(ts.descriptor(c, 0).sys_id); /* for (size_t m = 0; m < ts.num_microslices(c); ++m) { PrintMicroSliceDescriptor(ts.descriptor(c,m)); } */ } return true; }