Skip to content
Snippets Groups Projects
Commit 26080ec4 authored by Volker Friese's avatar Volker Friese Committed by Pierre-Alain Loizeau
Browse files

Revised CbmDevEventSink. Remove histogramming and timeslice buffering.

parent 803ed31c
No related branches found
No related tags found
1 merge request!860Revised CbmDevEventSink. Remove histogramming and timeslice buffering.
Pipeline #17899 passed
/* Copyright (C) 2020-2022 Facility for Antiproton and Ion Research in Europe, Darmstadt /* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer], Dominik Smith */ Authors: Dominik Smith [committer], Pierre-Alain Loizeau, Volker Friese */
#include "CbmDevEventSink.h" #include "CbmDevEventSink.h"
/// CBM headers // CBM headers
#include "CbmEvent.h"
#include "CbmFlesCanvasTools.h"
#include "CbmMQDefs.h" #include "CbmMQDefs.h"
#include "TimesliceMetaData.h" #include "TimesliceMetaData.h"
/// FAIRROOT headers // FAIRROOT headers
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig #include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "FairRootFileSink.h" #include "FairRootFileSink.h"
#include "FairRootManager.h" #include "FairRootManager.h"
#include "FairRunOnline.h" #include "FairRunOnline.h"
...@@ -23,568 +19,190 @@ ...@@ -23,568 +19,190 @@
#include "RootSerializer.h" #include "RootSerializer.h"
/// FAIRSOFT headers (geant, boost, ...) // External packages
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"
#include <boost/archive/binary_iarchive.hpp> #include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp> #include <boost/serialization/utility.hpp>
/// C/C++ headers /// C++ headers
#include <thread> // this_thread::sleep_for #include <thread> // this_thread::sleep_for
#include <array>
#include <iomanip>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
using std::istringstream;
using std::string;
using std::vector;
struct InitTaskError : std::runtime_error { struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error; using std::runtime_error::runtime_error;
}; };
using namespace std;
//Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE; // ----- Destructor -------------------------------------------------------
CbmDevEventSink::~CbmDevEventSink()
{
CbmDevEventSink::CbmDevEventSink() {} // Close things properly if not already done
if (!fFinishDone) Finish();
// Clear and delete members
if (fTsMetaData) delete fTsMetaData;
if (fEventVec != nullptr) {
fEventVec->clear();
delete fEventVec;
}
if (fFairRun) delete fFairRun;
}
// ----------------------------------------------------------------------------
// ----- Initialize -------------------------------------------------------
void CbmDevEventSink::InitTask() void CbmDevEventSink::InitTask()
try { try {
/// Read options from executable
LOG(info) << "Init options for CbmDevEventSink."; // Read options from executable
LOG(info) << "Init options for CbmDevEventSink";
fsOutputFileName = fConfig->GetValue<std::string>("OutFileName"); string outputFileName = fConfig->GetValue<std::string>("OutFileName");
string channelNameDataInput = fConfig->GetValue<std::string>("ChannelNameDataInput");
fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn"); string channelNameCommands = fConfig->GetValue<std::string>("ChannelNameCommands");
fsAllowedChannels[0] = fsChannelNameDataInput;
// --- Hook action on input channels
fbFillHistos = fConfig->GetValue<bool>("FillHistos"); OnData(channelNameDataInput, &CbmDevEventSink::HandleData);
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); OnData(channelNameCommands, &CbmDevEventSink::HandleCommand);
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax"); // --- Prepare ROOT output
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); // TODO: WE use FairRunOnline and FairRootManager to manage the output. There might be a more
// elegant way.
/// Associate the MissedTs Channel to the corresponding handler fTsMetaData = new TimesliceMetaData();
OnData(fsChannelNameMissedTs, &CbmDevEventSink::HandleMissTsData); fEventVec = new vector<CbmDigiEvent>();
if ("" != outputFileName) {
/// Associate the command Channel to the corresponding handler fFairRun = new FairRunOnline();
OnData(fsChannelNameCommands, &CbmDevEventSink::HandleCommand); fFairRootMgr = FairRootManager::Instance();
fFairRootMgr->SetSink(new FairRootFileSink(outputFileName));
/// Associate the Event + Unp data Channel to the corresponding handler if (nullptr == fFairRootMgr->GetOutFile()) throw InitTaskError("Could not open ROOT file");
// Get the information about created channels from the device }
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDevEventSink::HandleData);
} // if( entry.first.find( "ts" )
} // for( auto const &entry : fChannels )
// InitContainers();
/// Prepare storage TClonesArrays
/// TS MetaData storage
fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
if (NULL == fTimeSliceMetaDataArray) {
throw InitTaskError("Failed creating the TS meta data TClonesarray ");
} // if( NULL == fTimeSliceMetaDataArray )
/// Events storage
/// TODO: remove TObject from CbmEvent and switch to vectors!
fEventsSel = new std::vector<CbmDigiEvent>();
/// Prepare root output
if ("" != fsOutputFileName) {
fpRun = new FairRunOnline();
fpFairRootMgr = FairRootManager::Instance();
fpFairRootMgr->SetSink(new FairRootFileSink(fsOutputFileName));
if (nullptr == fpFairRootMgr->GetOutFile()) {
throw InitTaskError("Could not open root file");
} // if( nullptr == fpFairRootMgr->GetOutFile() )
} // if( "" != fsOutputFileName )
else { else {
throw InitTaskError("Empty output filename!"); throw InitTaskError("Empty output filename!");
} // else of if( "" != fsOutputFileName ) }
fFairRootMgr->InitSink();
LOG(info) << "Init Root Output to " << fsOutputFileName; fFairRootMgr->RegisterAny("TimesliceMetaData.", fTsMetaData, kTRUE);
fFairRootMgr->RegisterAny("DigiEvent", fEventVec, kTRUE);
fpFairRootMgr->InitSink(); fFairRootMgr->WriteFolder();
/// Register all input data members with the FairRoot manager LOG(info) << "Init ROOT Output to " << outputFileName;
/// TS MetaData
fpFairRootMgr->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kTRUE);
/// CbmEvent
fpFairRootMgr->RegisterAny("DigiEvent", fEventsSel, kTRUE);
fpFairRootMgr->WriteFolder();
LOG(info) << "Initialized outTree with rootMgr at " << fpFairRootMgr;
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Comment to prevent clang format single lining
if (kFALSE == InitHistograms()) { throw InitTaskError("Failed to initialize the histograms."); }
} // if( kTRUE == fbFillHistos )
} }
catch (InitTaskError& e) { catch (InitTaskError& e) {
LOG(error) << e.what(); LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound); cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
} }
// ----------------------------------------------------------------------------
bool CbmDevEventSink::IsChannelNameAllowed(std::string channelName)
{
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
bool CbmDevEventSink::InitHistograms()
{
/// Histos creation and obtain pointer on them
/// Trigger histo creation, filling vHistos and vCanvases
// bool initOK =CreateHistograms();
bool initOK = true;
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
// ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
std::vector<std::pair<TNamed*, std::string>> vHistos = {};
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
// ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add(vHistos[uHisto].first);
std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
fvpsHistosFolder.push_back(psHistoConfig);
LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[uCanv].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
fvpsCanvasConfig.push_back(psCanvConfig);
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return initOK;
}
//--------------------------------------------------------------------// // ----- Finish execution -------------------------------------------------
// handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0) void CbmDevEventSink::Finish()
bool CbmDevEventSink::HandleMissTsData(FairMQMessagePtr& msg, int /*index*/)
{ {
std::vector<uint64_t> vIndices; fFairRootMgr->Write();
std::string msgStrMissTs(static_cast<char*>(msg->GetData()), msg->GetSize()); fFairRootMgr->CloseSink();
std::istringstream issMissTs(msgStrMissTs); LOG(info) << "File closed after " << fNumMessages << " and saving " << fNumTs << " TS";
boost::archive::binary_iarchive inputArchiveMissTs(issMissTs); LOG(info) << "Index of last processed timeslice: " << fPrevTsIndex, ChangeState(fair::mq::Transition::Stop);
inputArchiveMissTs >> vIndices; std::this_thread::sleep_for(std::chrono::milliseconds(3000));
ChangeState(fair::mq::Transition::End);
fvulMissedTsIndices.insert(fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end()); fFinishDone = true;
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
return true;
} }
//--------------------------------------------------------------------// // ----------------------------------------------------------------------------
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDevEventSink::HandleData(FairMQParts& parts, int /*index*/)
{
fulNumMessages++;
LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts"
<< ", size0: " << parts.At(0)->GetSize();
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
/// Unpack the message
CbmEventTimeslice unpTs(parts);
/// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!!
LOG(debug) << "Next TS check " << fuPrevTsIndex << " " << fulTsCounter << " " << unpTs.fTsMetaData.GetIndex();
if (fuPrevTsIndex + 1 == unpTs.fTsMetaData.GetIndex()
|| (0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == unpTs.fTsMetaData.GetIndex())) {
LOG(debug) << "TS direct to dump";
/// Fill all storage variables registers for data output
PrepareTreeEntry(unpTs);
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = unpTs.fTsMetaData.GetIndex();
fulTsCounter++;
}
else {
LOG(debug) << "TS direct to storage";
/// If not consecutive to last TS sent,
fmFullTsStorage.emplace_hint(fmFullTsStorage.end(),
std::pair<uint64_t, CbmEventTimeslice>(unpTs.fTsMetaData.GetIndex(), unpTs));
}
LOG(debug) << "TS metadata checked";
/// Clear metadata => crashes, maybe not needed as due to move the pointer is invalidated?
// delete fTsMetaData;
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
LOG(debug) << "TS queues checked";
/// Histograms management
if (kTRUE == fbFillHistos) {
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
if (!fbConfigSent) {
// Send the configuration only once per run!
fbConfigSent = SendHistoConfAndData();
} // if( !fbConfigSent )
else
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
} // if( kTRUE == fbFillHistos )
LOG(debug) << "Processed TS with saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter
<< " full ones and " << fulMissedTsCounter << " missed/empty ones)";
LOG(debug) << "Buffers are " << fmFullTsStorage.size() << " full TS and " << fvulMissedTsIndices.size()
<< " missed/empty ones)";
LOG(debug) << "Buffers are " << fmFullTsStorage.size() << " full TS and " << fvulMissedTsIndices.size()
<< " missed/empty ones)";
return true; // ----- Handle command message -------------------------------------------
} bool CbmDevEventSink::HandleCommand(FairMQMessagePtr& msg, int)
//--------------------------------------------------------------------//
bool CbmDevEventSink::HandleCommand(FairMQMessagePtr& msg, int /*index*/)
{ {
/* // Deserialize command string
std::string sCommand( static_cast< char * >( msg->GetData() ), string command;
msg->GetSize() ); string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
*/ istringstream issCmd(msgStrCmd);
std::string sCommand;
std::string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream issCmd(msgStrCmd);
boost::archive::binary_iarchive inputArchiveCmd(issCmd); boost::archive::binary_iarchive inputArchiveCmd(issCmd);
inputArchiveCmd >> sCommand; inputArchiveCmd >> command;
std::string sCmdTag = sCommand; // Command tag is up to the first blank
size_t charPosDel = sCommand.find(' '); size_t charPosDel = command.find(' ');
if (std::string::npos != charPosDel) { string type = command.substr(0, charPosDel);
sCmdTag = sCommand.substr(0, charPosDel);
} // if( std::string::npos != charPosDel )
if ("EOF" == sCmdTag) { // EOF command
fbReceivedEof = true; if (type == "EOF") {
/// Extract the last TS index and global full TS count // The second substring should be the last timeslice index
if (std::string::npos == charPosDel) { if (charPosDel == string::npos) {
LOG(fatal) << "CbmDevEventSink::HandleCommand => " LOG(error) << "HandleCommand: Incomplete EOF command " << command;
<< "Incomplete EOF command received: " << sCommand;
return false; return false;
} // if( std::string::npos == charPosDel ) }
/// Last TS index
charPosDel++; charPosDel++;
std::string sNext = sCommand.substr(charPosDel); string rest = command.substr(charPosDel);
charPosDel = sNext.find(' '); charPosDel = rest.find(' ');
if (charPosDel == string::npos) {
if (std::string::npos == charPosDel) { LOG(error) << "HandleCommand: Incomplete EOF command " << command;
LOG(fatal) << "CbmDevEventSink::HandleCommand => "
<< "Incomplete EOF command received: " << sCommand;
return false; return false;
} // if( std::string::npos == charPosDel )
fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel));
/// Total TS count
charPosDel++;
fuTotalTsCount = std::stoul(sNext.substr(charPosDel));
LOG(info) << "CbmDevEventSink::HandleCommand => "
<< "Received EOF command with final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
/// End of data: clean save of data + close file + send last state of histos if enabled
if (fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) {
LOG(info) << "CbmDevEventSink::HandleCommand => "
<< "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
Finish();
} // if( fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
} // if( "EOF" == sCmdTag )
else if ("STOP" == sCmdTag) {
/// TODO: different treatment in case of "BAD" ending compared to EOF?
/// Source failure: clean save of received data + close file + send last state of histos if enabled
Finish();
} // else if( "STOP" == sCmdTag )
else {
LOG(warning) << "Unknown command received: " << sCmdTag << " => will be ignored!";
} // else if command not recognized
return true;
}
//--------------------------------------------------------------------//
void CbmDevEventSink::CheckTsQueues()
{
bool bHoleFoundInBothQueues = false;
std::map<uint64_t, CbmEventTimeslice>::iterator itFullTs = fmFullTsStorage.begin();
std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin();
while (!bHoleFoundInBothQueues) {
/// Check if the first TS in the full TS queue is the next one
if (fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first) {
/// Fill all storage variables registers for data output
PrepareTreeEntry((*itFullTs).second);
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = (*itFullTs).first;
fulTsCounter++;
/// Increment iterator
++itFullTs;
continue;
} // if( fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first() )
if (fmFullTsStorage.end() != itFullTs)
LOG(debug) << "CbmDevEventSink::CheckTsQueues => Full TS " << (*itFullTs).first << " VS " << (fuPrevTsIndex + 1);
/// Check if the first TS in the missed TS queue is the next one
if (fvulMissedTsIndices.end() != itMissTs
&& ((0 == fuPrevTsIndex && fuPrevTsIndex == (*itMissTs))
|| ((0 < fulTsCounter || 0 < fulMissedTsCounter) && fuPrevTsIndex + 1 == (*itMissTs)))) {
/// Prepare entry with only dummy TS metadata and empty storage variables
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
TimesliceMetaData(0, 0, 0, (*itMissTs));
/// Trigger FairRoot manager to dump Tree entry
DumpTreeEntry();
/// Update counters
fuPrevTsIndex = (*itMissTs);
fulMissedTsCounter++;
/// Increment iterator
++itMissTs;
continue;
} // if( fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs ) )
if (fvulMissedTsIndices.end() != itMissTs)
LOG(debug) << "CbmDevEventSink::CheckTsQueues => Empty TS " << (*itMissTs) << " VS " << (fuPrevTsIndex + 1);
/// Should be reached only if both queues at the end or hole found in both
bHoleFoundInBothQueues = true;
} // while( !bHoleFoundInBothQueues )
LOG(debug) << "CbmDevEventSink::CheckTsQueues => buffered TS " << fmFullTsStorage.size() << " buffered empties "
<< fvulMissedTsIndices.size();
for (auto it = fmFullTsStorage.begin(); it != fmFullTsStorage.end(); ++it) {
LOG(debug) << "CbmDevEventSink::CheckTsQueues => buffered TS index " << (*it).first;
} }
uint64_t lastTsIndex = std::stoul(rest.substr(0, charPosDel));
/// Delete the processed entries // The third substring should be the timeslice count
fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs); charPosDel++;
fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs); uint64_t numTs = std::stoul(rest.substr(charPosDel));
/// End of data: clean save of data + close file + send last state of histos if enabled // Log
if (fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount) { LOG(info) << "HandleCommand: Received EOF command with final TS index " << lastTsIndex << " and total number of TS "
LOG(info) << "CbmDevEventSink::CheckTsQueues => " << numTs;
<< "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
Finish(); Finish();
} // if( fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount ) } //? EOF
}
//--------------------------------------------------------------------//
void CbmDevEventSink::PrepareTreeEntry(CbmEventTimeslice unpTs)
{
/// FIXME: poor man solution with lots of data copy until we undertsnad how to properly deal
/// with FairMq messages ownership and memory managment
/// FIXME: Not sure if this is the proper way to insert the data
new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
TimesliceMetaData(std::move(unpTs.fTsMetaData));
/// Extract CbmEvent TClonesArray from input message
(*fEventsSel) = std::move(unpTs.GetSelectedData());
}
void CbmDevEventSink::DumpTreeEntry() // STOP command
{ else if (type == "STOP") {
// Unpacked digis + CbmEvent output to root file LOG(info) << "HandleCommand: Received STOP command";
/// FairRunOnline style Finish();
fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime());
fpFairRootMgr->Fill();
fpFairRootMgr->DeleteOldWriteoutBufferData();
/// Clear metadata array
fTimeSliceMetaDataArray->Clear();
/// Clear event vector
fEventsSel->clear();
} }
//--------------------------------------------------------------------// // Unknown command
else {
bool CbmDevEventSink::SendHistoConfAndData() LOG(warning) << "HandleCommand: Unknown command " << type << " => will be ignored!";
{
/// Prepare multiparts message and header
std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
FairMQMessagePtr messageHeader(NewMessage());
BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
FairMQParts partsOut;
partsOut.AddPart(std::move(messageHeader));
for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
partsOut.AddPart(std::move(messageHist));
} // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
partsOut.AddPart(std::move(messageCan));
} // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr msgHistos(NewMessage());
RootSerializer().Serialize(*msgHistos, &fArrayHisto);
partsOut.AddPart(std::move(msgHistos));
/// Send the multi-parts message to the common histogram messages queue
if (Send(partsOut, fsChannelNameHistosInput) < 0) {
LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data";
return false;
} // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
// ResetHistograms(kFALSE);
return true;
} }
bool CbmDevEventSink::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
RootSerializer().Serialize(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
// ResetHistograms(kFALSE);
return true; return true;
} }
// ----------------------------------------------------------------------------
//--------------------------------------------------------------------//
CbmDevEventSink::~CbmDevEventSink()
{
/// FIXME: Add pointers check before delete
/// Close things properly if not alredy done
if (!fbFinishDone) Finish();
/// Clear events vector
fEventsSel->clear();
delete fEventsSel;
delete fpRun;
}
void CbmDevEventSink::Finish()
{
// Clean closure of output to root file
fpFairRootMgr->Write();
// fpFairRootMgr->GetSource()->Close();
fpFairRootMgr->CloseSink();
LOG(info) << "File closed after saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter
<< " full ones and " << fulMissedTsCounter << " missed/empty ones)";
LOG(info) << "Still buffered TS " << fmFullTsStorage.size() << " and still buffered empties "
<< fvulMissedTsIndices.size();
if (kTRUE == fbFillHistos) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( kTRUE == fbFillHistos )
ChangeState(fair::mq::Transition::Stop);
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
ChangeState(fair::mq::Transition::End);
fbFinishDone = kTRUE;
}
CbmEventTimeslice::CbmEventTimeslice(FairMQParts& parts) // ----- Handle data in input channel -------------------------------------
bool CbmDevEventSink::HandleData(FairMQParts& parts, int)
{ {
/// Extract unpacked data from input message fNumMessages++;
uint32_t uPartIdx = 0; LOG(debug) << "Received message number " << fNumMessages << " with " << parts.Size() << " parts"
<< ", size0: " << parts.At(0)->GetSize();
if (0 == fNumMessages % 10000) LOG(info) << "Received " << fNumMessages << " messages";
/// TS metadata // --- Extract TimesliceMetaData (part 0) TObject* tempObjectPointer = nullptr;
TObject* tempObjectPointer = nullptr; TObject* tempObjectPointer = nullptr;
RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); RootSerializer().Deserialize(*parts.At(0), tempObjectPointer);
if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) { if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) {
fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectPointer)); (*fTsMetaData) = *(static_cast<TimesliceMetaData*>(tempObjectPointer));
} }
else { else {
LOG(fatal) << "Failed to deserialize the TS metadata"; LOG(fatal) << "Failed to deserialize the TS metadata";
} }
++uPartIdx;
/// Events // --- Extract event vector (part 1)
std::string msgStrEvt(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); std::string msgStrEvt(static_cast<char*>(parts.At(1)->GetData()), (parts.At(1))->GetSize());
std::istringstream issEvt(msgStrEvt); std::istringstream issEvt(msgStrEvt);
boost::archive::binary_iarchive inputArchiveEvt(issEvt); boost::archive::binary_iarchive inputArchiveEvt(issEvt);
inputArchiveEvt >> fvEvents; inputArchiveEvt >> (*fEventVec);
++uPartIdx;
LOG(debug) << "Input event array " << fvEvents.size(); // --- Dump tree entry for this timeslice
} fFairRootMgr->StoreWriteoutBufferData(fFairRootMgr->GetEventTime());
fFairRootMgr->Fill();
fFairRootMgr->DeleteOldWriteoutBufferData();
fEventVec->clear();
CbmEventTimeslice::~CbmEventTimeslice() { fvEvents.clear(); } // --- Timeslice log
LOG(info) << "Processed TS " << fTsMetaData->GetIndex() << " with " << fEventVec->size() << " events";
std::vector<CbmDigiEvent> CbmEventTimeslice::GetSelectedData() { return fvEvents; } return true;
}
// ----------------------------------------------------------------------------
/* Copyright (C) 2020-2022 Facility for Antiproton and Ion Research in Europe, Darmstadt /* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer], Dominik Smith */ Authors: Dominik Smith [committer], Pierre-Alain Loizeau, Volker Friese */
#ifndef CBMDEVICEEVTSINK_H_ #ifndef CBMDEVICEEVTSINK_H_
#define CBMDEVICEEVTSINK_H_ #define CBMDEVICEEVTSINK_H_
/// CBM headers
#include "CbmDigiEvent.h" #include "CbmDigiEvent.h"
#include "CbmEvent.h"
#include "CbmMqTMessage.h"
#include "CbmMuchDigi.h"
#include "CbmPsdDigi.h"
#include "CbmRichDigi.h"
#include "CbmStsDigi.h"
#include "CbmTofDigi.h"
#include "CbmTrdDigi.h"
#include "TimesliceMetaData.h"
/// FAIRROOT headers
#include "FairMQDevice.h"
/// FAIRSOFT headers (geant, boost, ...) #include "FairMQDevice.h"
#include "Rtypes.h"
#include "TClonesArray.h"
#include "TObjArray.h"
/// C/C++ headers
#include <chrono>
#include <map>
#include <vector> #include <vector>
class TFile; class TimesliceMetaData;
class TList;
class TClonesArray;
class FairRunOnline; class FairRunOnline;
class FairRootManager; class FairRootManager;
class CbmEventTimeslice {
/// TODO: rename to CbmTsWithEvents
public:
CbmEventTimeslice(FairMQParts& parts);
~CbmEventTimeslice();
std::vector<CbmDigiEvent> GetSelectedData();
TimesliceMetaData fTsMetaData;
std::vector<CbmDigiEvent> fvEvents;
};
/** @class CbmDefEventSink
** @brief MQ device class to write CbmDigiEvents to a ROOT file
** @author Dominik Smith <d.smith@gsi.de>
**
** Based on previous, similar implementations by P.-A. Loizeau
**
** The event sink device receives data (vector of CbmDigiEvents for a given timeslice) in the
** respective input channel and fills a ROOT tree/file with these data.
**/
class CbmDevEventSink : public FairMQDevice { class CbmDevEventSink : public FairMQDevice {
public: public:
CbmDevEventSink(); /** @brief Constructor **/
CbmDevEventSink() {};
/** @brief Destructor **/
virtual ~CbmDevEventSink(); virtual ~CbmDevEventSink();
protected: protected:
/** @brief Action on command messages
** @param parts Message
** @param flag Not used; ignored
** @return Success
**/
bool HandleCommand(FairMQMessagePtr&, int flag);
/** @brief Action on data messages
** @param parts Message
** @param flag Not used; ignored
** @return Success
*/
bool HandleData(FairMQParts& parts, int flag);
/** @brief Initialization **/
virtual void InitTask(); virtual void InitTask();
bool HandleMissTsData(FairMQMessagePtr&, int);
bool HandleData(FairMQParts&, int);
bool HandleCommand(FairMQMessagePtr&, int); private: // methods
/** @brief Finishing run **/
private:
/// Constants
/// Control flags
Bool_t fbFillHistos = false; //! Switch ON/OFF filling of histograms
Bool_t fbFinishDone = false; //! Keep track of whether the Finish was already called
/// User settings parameters
/// Algo enum settings
std::string fsOutputFileName = "mcbm_digis_events.root";
/// message queues
std::string fsChannelNameMissedTs = "missedts";
std::string fsChannelNameDataInput = "events";
std::string fsChannelNameCommands = "commands";
std::string fsChannelNameHistosInput = "histogram-in";
/// Histograms management
uint32_t fuPublishFreqTs = 100;
double_t fdMinPublishTime = 0.5;
double_t fdMaxPublishTime = 5.0;
/// List of MQ channels names
std::vector<std::string> fsAllowedChannels = {fsChannelNameDataInput};
/// Statistics & missed TS detection
uint64_t fuPrevTsIndex = 0;
uint64_t fulNumMessages = 0;
uint64_t fulTsCounter = 0;
uint64_t fulMissedTsCounter = 0;
std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now();
/// Control Commands reception
bool fbReceivedEof = false;
uint64_t fuLastTsIndex = 0;
uint64_t fuTotalTsCount = 0;
/// Data reception
/// TS MetaData storage
TClonesArray* fTimeSliceMetaDataArray = nullptr; //!
TimesliceMetaData* fTsMetaData = nullptr;
/// CbmEvents
std::vector<CbmDigiEvent>* fEventsSel = nullptr; //! output container of CbmEvents
/// Storage for re-ordering
/// Missed TS vector
std::vector<uint64_t> fvulMissedTsIndices = {};
/// Buffered TS
std::map<uint64_t, CbmEventTimeslice> fmFullTsStorage = {};
/// Data storage
FairRunOnline* fpRun = nullptr;
FairRootManager* fpFairRootMgr = nullptr;
/// Array of histograms to send to the histogram server
TObjArray fArrayHisto = {};
/// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {};
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {};
/// Flag indicating whether the histograms and canvases configurations were already published
bool fbConfigSent = false;
/// Internal methods
bool IsChannelNameAllowed(std::string channelName);
bool InitHistograms();
void CheckTsQueues();
void PrepareTreeEntry(CbmEventTimeslice unpTs);
void DumpTreeEntry();
bool SendHistoConfAndData();
bool SendHistograms();
void Finish(); void Finish();
private: // members
// --- Counters and status flags
size_t fNumMessages = 0; ///< Number of received data messages
size_t fNumTs = 0; ///< Number of processed timeslices
uint64_t fPrevTsIndex = 0; ///< Index of last processed timeslice
bool fFinishDone = false; ///< Keep track of whether the Finish method was already called
TimesliceMetaData* fTsMetaData = nullptr; ///< Data output: TS meta data
std::vector<CbmDigiEvent>* fEventVec = nullptr; ///< Data output: events
FairRunOnline* fFairRun = nullptr; ///< FairRunOnline to instantiate FairRootManager
FairRootManager* fFairRootMgr = nullptr; ///< FairRootManager used for ROOT file I/O
}; };
#endif /* CBMDEVICEEVTSINK_H_ */ #endif /* CBMDEVICEEVTSINK_H_ */
...@@ -14,22 +14,12 @@ using namespace std; ...@@ -14,22 +14,12 @@ using namespace std;
void addCustomOptions(bpo::options_description& options) void addCustomOptions(bpo::options_description& options)
{ {
options.add_options()("StoreFullTs", bpo::value<bool>()->default_value(false), options.add_options()("OutFileName", bpo::value<std::string>()->default_value(""),
"Store digis vectors with full TS in addition to selected events if true");
options.add_options()("OutFileName", bpo::value<std::string>()->default_value("mcbm_digis_events.root"),
"Name (full or relative path) of the output .root file "); "Name (full or relative path) of the output .root file ");
options.add_options()("EvtNameIn", bpo::value<std::string>()->default_value("events"), options.add_options()("ChannelNameDataInput", bpo::value<std::string>()->default_value("events"),
"MQ channel name for built events"); "MQ channel name for digi events");
options.add_options()("FillHistos", bpo::value<bool>()->default_value(false), options.add_options()("ChannelNameCommands", bpo::value<std::string>()->default_value("commands"),
"Fill histograms and send them to histo server if true"); "MQ channel name for commands");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
} }
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDevEventSink(); } FairMQDevicePtr getDevice(const FairMQProgOptions&) { return new CbmDevEventSink(); }
...@@ -194,17 +194,10 @@ EVTSINK="MqDevEventSink" ...@@ -194,17 +194,10 @@ EVTSINK="MqDevEventSink"
EVTSINK+=" --id evtsink1" EVTSINK+=" --id evtsink1"
EVTSINK+=" --severity info" EVTSINK+=" --severity info"
#EVTSINK+=" --severity debug" #EVTSINK+=" --severity debug"
#EVTSINK+=" --StoreFullTs 1"
EVTSINK+=" --OutFileName mcbm_digis_events.root" EVTSINK+=" --OutFileName mcbm_digis_events.root"
EVTSINK+=" --FillHistos false" EVTSINK+=" --ChannelNameDataInput events"
EVTSINK+=" --PubFreqTs $_pubfreqts"
EVTSINK+=" --PubTimeMin $_pubminsec"
EVTSINK+=" --PubTimeMax $_pubmaxsec"
EVTSINK+=" --EvtNameIn events"
EVTSINK+=" --channel-config name=events,type=pull,method=bind,transport=zeromq,address=tcp://127.0.0.1:11557" EVTSINK+=" --channel-config name=events,type=pull,method=bind,transport=zeromq,address=tcp://127.0.0.1:11557"
EVTSINK+=" --channel-config name=missedts,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11006"
EVTSINK+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007" EVTSINK+=" --channel-config name=commands,type=sub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11007"
EVTSINK+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666"
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX # Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log # with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
EVTSINK_LOG="evtsink1_$LOGFILETAG" EVTSINK_LOG="evtsink1_$LOGFILETAG"
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment